From 0999dbe12ed90a84b4fac05568e8ec5683fe7dba Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Wed, 3 Apr 2024 13:01:15 -0700 Subject: [PATCH 1/6] Adding logs to Tron to indicate start and scheduling times --- bin/trond | 5 +++-- tests/mcp_test.py | 15 ++++++++----- tron/core/actionrun.py | 8 +++++++ tron/core/job_collection.py | 12 +++++++++- tron/core/job_scheduler.py | 2 +- tron/core/jobrun.py | 1 + tron/eventbus.py | 1 + tron/mcp.py | 30 ++++++++++++++++++------- tron/serialize/runstate/statemanager.py | 3 +++ tron/trondaemon.py | 23 ++++++++++++++----- 10 files changed, 77 insertions(+), 23 deletions(-) diff --git a/bin/trond b/bin/trond index 28e807da6..d5490fba3 100755 --- a/bin/trond +++ b/bin/trond @@ -4,6 +4,7 @@ import argparse import faulthandler import logging import os +import time import traceback import pkg_resources @@ -168,10 +169,10 @@ def setup_environment(args): def main(): args = parse_cli() - + tron_start_time = time.time() setup_environment(args) trond = trondaemon.TronDaemon(args) - trond.run() + trond.run(tron_start_time=tron_start_time) if __name__ == "__main__": diff --git a/tests/mcp_test.py b/tests/mcp_test.py index ec4431eb6..eb8d83250 100644 --- a/tests/mcp_test.py +++ b/tests/mcp_test.py @@ -49,22 +49,23 @@ def test_reconfigure_namespace(self): self.mcp._load_config.assert_called_with(reconfigure=True, namespace_to_reconfigure="foo") @pytest.mark.parametrize( - "reconfigure,namespace", + "reconfigure,namespace,tron_start_time", [ - (False, None), - (True, None), - (True, "foo"), + (False, None, None), + (True, None, None), + (True, "foo", 5), ], ) - def test_load_config(self, reconfigure, namespace): + def test_load_config(self, reconfigure, namespace, tron_start_time): autospec_method(self.mcp.apply_config) self.mcp.config = mock.create_autospec(manager.ConfigManager) - self.mcp._load_config(reconfigure, namespace) + self.mcp._load_config(reconfigure, namespace, tron_start_time) self.mcp.state_watcher.disabled.assert_called_with() self.mcp.apply_config.assert_called_with( self.mcp.config.load.return_value, reconfigure=reconfigure, namespace_to_reconfigure=namespace, + tron_start_time=tron_start_time, ) @pytest.mark.parametrize( @@ -163,6 +164,7 @@ def teardown_mcp(self): def test_restore_state(self, mock_cluster_repo): job_state_data = {"1": "things", "2": "things"} mesos_state_data = {"3": "things", "4": "things"} + tron_start_time = None state_data = { "mesos_state": mesos_state_data, "job_state": job_state_data, @@ -174,6 +176,7 @@ def test_restore_state(self, mock_cluster_repo): self.mcp.jobs.restore_state.assert_called_with( job_state_data, action_runner, + tron_start_time, ) diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 27d2d90f7..842d5b192 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -301,6 +301,7 @@ def __init__( on_upstream_rerun=None, trigger_timeout_timestamp=None, original_command=None, + start_schedule_jobs=True, ): super().__init__() self.job_run_id = maybe_decode(job_run_id) @@ -1134,6 +1135,8 @@ def handle_action_command_state_change(self, action_command, event, event_data=N class KubernetesActionRun(ActionRun, Observer): """An ActionRun that executes the command on a Kubernetes cluster.""" + start_schedule_jobs = True + def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: """ Attempt to run a given ActionRunAttempt on the configured Kubernetes cluster. @@ -1192,6 +1195,11 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: self.watch(task) try: + if self.start_schedule_jobs: + log.info( + f"We will start submitting tasks (i.e scheduling jobs). Starting with {task.get_kubernetes_id()}" + ) + self.start_schedule_jobs = False k8s_cluster.submit(task) except Exception: log.exception(f"Unable to submit task for ActionRun {self.id}") diff --git a/tron/core/job_collection.py b/tron/core/job_collection.py index ed882493d..6e926b6fa 100644 --- a/tron/core/job_collection.py +++ b/tron/core/job_collection.py @@ -1,4 +1,5 @@ import logging +import time from tron.core.job import Job from tron.utils import collections @@ -40,6 +41,8 @@ def reconfigure_filter(config): else: return config.namespace == namespace_to_reconfigure + # The below statements don't get called until self.state_watcher.watch_all is called + # in apply_config. This will go through the job's configs and build a scheduler for them seq = (factory.build(config) for config in job_configs.values() if reconfigure_filter(config)) return map_to_job_and_schedule(filter(self.add, seq)) @@ -66,7 +69,14 @@ def update(self, new_job_scheduler): job_scheduler.schedule_reconfigured() return True - def restore_state(self, job_state_data, config_action_runner): + def restore_state(self, job_state_data, config_action_runner, tron_start_time=None): + # we will start looping through the jobs and their runs and restore + # state for each run in for a job and as we restore the state + # we will also schedule the next runs for that job + if tron_start_time is not None: + log.info( + f"Tron is restoring state for jobs and will start scheduling them! Time elapsed since Tron started {time.time()- tron_start_time}" + ) for name, state in job_state_data.items(): self.jobs[name].restore_state(state, config_action_runner) log.info(f"Loaded state for {len(job_state_data)} jobs") diff --git a/tron/core/job_scheduler.py b/tron/core/job_scheduler.py index 434bb479f..73184a11a 100644 --- a/tron/core/job_scheduler.py +++ b/tron/core/job_scheduler.py @@ -249,7 +249,7 @@ def __init__(self, context, output_stream_dir, time_zone, action_runner, job_gra self.job_graph = job_graph def build(self, job_config): - log.debug(f"Building new job {job_config.name}") + log.debug(f"Building new job scheduler {job_config.name}") output_path = filehandler.OutputPath(self.output_stream_dir) time_zone = job_config.time_zone or self.time_zone scheduler = scheduler_from_config(job_config.schedule, time_zone) diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index 11f37b7e7..d73021850 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -462,6 +462,7 @@ def get_first_queued(self, node=None): ) def get_scheduled(self): + # find the scheduled run for the job and return it return [r for r in self.runs if r.state == ActionRun.SCHEDULED] def next_run_num(self): diff --git a/tron/eventbus.py b/tron/eventbus.py index de9ad28a3..585ba93c7 100644 --- a/tron/eventbus.py +++ b/tron/eventbus.py @@ -140,6 +140,7 @@ def _has_event(self, event_id): def sync_load_log(self): started = time.time() + # we are loading the pickle object from the "current" file with open(self.log_current, "rb") as f: self.event_log = pickle.load(f) duration = time.time() - started diff --git a/tron/mcp.py b/tron/mcp.py index 3a9620e6e..48d93439f 100644 --- a/tron/mcp.py +++ b/tron/mcp.py @@ -22,6 +22,8 @@ def apply_master_configuration(mapping, master_config): def get_config_value(seq): return [getattr(master_config, item) for item in seq] + # mapping consists of different functions in MCP mapped to their config from tronfig in Master.yaml + # for example, we will have MasterControlProgram.configure_eventbus function mapped to eventbus_enabled option for entry in mapping: func, args = entry[0], get_config_value(entry[1:]) func(*args) @@ -38,7 +40,8 @@ def __init__(self, working_dir, config_path): self.context = command_context.CommandContext() self.state_watcher = statemanager.StateChangeWatcher() self.boot_time = time.time() - log.info("initialized") + current_time = time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()) + log.info(f"Initialized. Tron started on {current_time}!") def shutdown(self): EventBus.shutdown() @@ -53,30 +56,34 @@ def reconfigure(self, namespace=None): log.exception(f"reconfigure failure: {e.__class__.__name__}: {e}") raise e - def _load_config(self, reconfigure=False, namespace_to_reconfigure=None): + def _load_config(self, reconfigure=False, namespace_to_reconfigure=None, tron_start_time=None): """Read config data and apply it.""" with self.state_watcher.disabled(): self.apply_config( self.config.load(), reconfigure=reconfigure, namespace_to_reconfigure=namespace_to_reconfigure, + tron_start_time=tron_start_time, ) - def initial_setup(self): + def initial_setup(self, tron_start_time=None): """When the MCP is initialized the config is applied before the state. In this case jobs shouldn't be scheduled until the state is applied. """ - self._load_config() + # The job schedules have been applied in first step + self._load_config(tron_start_time=tron_start_time) + # Now the jobs will actually get scheduled once the state for action runs are restored self.restore_state( actioncommand.create_action_runner_factory_from_config( self.config.load().get_master().action_runner, ), + tron_start_time=tron_start_time, ) # Any job with existing state would have been scheduled already. Jobs # without any state will be scheduled here. self.jobs.run_queue_schedule() - def apply_config(self, config_container, reconfigure=False, namespace_to_reconfigure=None): + def apply_config(self, config_container, reconfigure=False, namespace_to_reconfigure=None, tron_start_time=None): """Apply a configuration.""" master_config_directives = [ (self.update_state_watcher_config, "state_persistence"), @@ -103,6 +110,7 @@ def apply_config(self, config_container, reconfigure=False, namespace_to_reconfi # TODO: unify NOTIFY_STATE_CHANGE and simplify this self.job_graph = JobGraph(config_container) + # Builds a job scheduler factory for Tron jobs, which will schedule internally in Tron the jobs factory = self.build_job_scheduler_factory(master_config, self.job_graph) updated_jobs = self.jobs.update_from_config( config_container.get_jobs(), @@ -110,9 +118,15 @@ def apply_config(self, config_container, reconfigure=False, namespace_to_reconfi reconfigure, namespace_to_reconfigure, ) + # We will build the schedulers once the watcher is invoked + if tron_start_time is not None: + log.info( + f"Tron built the schedulers for Tron jobs internally! Time elapsed since Tron started {time.time() - tron_start_time}s" + ) self.state_watcher.watch_all(updated_jobs, [Job.NOTIFY_STATE_CHANGE, Job.NOTIFY_NEW_RUN]) def build_job_scheduler_factory(self, master_config, job_graph): + """Creates an action runner if option specified in tronfigs and job scheduler instances for Tron jobs""" output_stream_dir = master_config.output_stream_dir or self.working_dir action_runner = actioncommand.create_action_runner_factory_from_config( master_config.action_runner, @@ -150,15 +164,15 @@ def get_job_collection(self): def get_config_manager(self): return self.config - def restore_state(self, action_runner): + def restore_state(self, action_runner, tron_start_time=None): """Use the state manager to retrieve to persisted state and apply it to the configured Jobs. """ - log.info("restoring") + log.info("restoring from DynamoDB") states = self.state_watcher.restore(self.jobs.get_names()) MesosClusterRepository.restore_state(states.get("mesos_state", {})) - self.jobs.restore_state(states.get("job_state", {}), action_runner) + self.jobs.restore_state(states.get("job_state", {}), action_runner, tron_start_time=tron_start_time) self.state_watcher.save_metadata() def __str__(self): diff --git a/tron/serialize/runstate/statemanager.py b/tron/serialize/runstate/statemanager.py index d5c24fdfa..53af32594 100644 --- a/tron/serialize/runstate/statemanager.py +++ b/tron/serialize/runstate/statemanager.py @@ -146,6 +146,8 @@ def restore(self, job_names, skip_validation=False): self._restore_metadata() jobs = self._restore_dicts(runstate.JOB_STATE, job_names) + # jobs should be a dictionary that contains job name and number of runs + # {'Master.k8s': {'run_nums':[0], 'enabled': True}, 'Master.cits_test_frequent_1': {'run_nums'; [1,0], 'enabled': True}} for job_name, job_state in jobs.items(): job_state["runs"] = self._restore_runs_for_job(job_name, job_state) frameworks = self._restore_dicts(runstate.MESOS_STATE, ["frameworks"]) @@ -263,6 +265,7 @@ def update_from_config(self, state_config): return False self.shutdown() + # The function below will spin up a thread that will be saving into dynamodb, which will run as daemon self.state_manager = PersistenceManagerFactory.from_config( state_config, ) diff --git a/tron/trondaemon.py b/tron/trondaemon.py index 7c3591f8a..175c29450 100644 --- a/tron/trondaemon.py +++ b/tron/trondaemon.py @@ -75,7 +75,7 @@ def __init__(self, options): self.signals = {signal.SIGINT: signal.default_int_handler} self.manhole_sock = f"{self.options.working_dir}/manhole.sock" - def run(self): + def run(self, tron_start_time=None): with no_daemon_context(self.working_dir, self.lock_file, self.signals): signal_map = { signal.SIGHUP: self._handle_reconfigure, @@ -85,11 +85,23 @@ def run(self): signal.SIGUSR1: self._handle_debug, } signal.pthread_sigmask(signal.SIG_BLOCK, signal_map.keys()) - - self._run_mcp() + log.info("Starting the setup process of mcp, api, manhole and runnning the reactor") + self._run_mcp(tron_start_time=tron_start_time) + log.info( + f"The master control plane has finished setup. Time elapsed since Tron started {time.time() - tron_start_time}s" + ) self._run_www_api() + log.info( + f"The Tron API has finished setup. Time elapsed since Tron started {time.time() - tron_start_time}s" + ) self._run_manhole() + log.info( + f"The manhole has finished setup. Time elapsed since Tron started {time.time() - tron_start_time}s" + ) self._run_reactor() + log.info( + f"Twisted reactor has started. The Tron API should be up and ready now to receive requests. Time elapsed since Tron started {time.time() - tron_start_time}s" + ) while True: signum = signal.sigwait(list(signal_map.keys())) @@ -117,7 +129,7 @@ def _run_www_api(self): port = self.options.listen_port reactor.listenTCP(port, site, interface=self.options.listen_host) - def _run_mcp(self): + def _run_mcp(self, tron_start_time=None): # Local import required because of reactor import in mcp from tron import mcp @@ -126,7 +138,7 @@ def _run_mcp(self): self.mcp = mcp.MasterControlProgram(working_dir, config_path) try: - self.mcp.initial_setup() + self.mcp.initial_setup(tron_start_time=tron_start_time) except Exception as e: msg = "Error in configuration %s: %s" log.exception(msg % (config_path, e)) @@ -134,6 +146,7 @@ def _run_mcp(self): def _run_reactor(self): """Run the twisted reactor.""" + # Starts Tron server threading.Thread( target=reactor.run, daemon=True, From b427e1470035775dcf41d98805cac064e1f80337 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Thu, 4 Apr 2024 12:08:26 -0700 Subject: [PATCH 2/6] Addressed reviews on the PR --- bin/trond | 4 +-- tests/mcp_reconfigure_test.py | 3 +- tests/mcp_test.py | 34 +++++++++------------ tron/core/actionrun.py | 7 ++--- tron/core/job_collection.py | 17 ++++++----- tron/core/jobrun.py | 3 +- tron/eventbus.py | 1 - tron/kubernetes.py | 3 ++ tron/mcp.py | 39 ++++++++++++------------- tron/serialize/runstate/statemanager.py | 7 +++-- tron/trondaemon.py | 27 ++++++++--------- 11 files changed, 67 insertions(+), 78 deletions(-) diff --git a/bin/trond b/bin/trond index d5490fba3..8d505e367 100755 --- a/bin/trond +++ b/bin/trond @@ -169,10 +169,10 @@ def setup_environment(args): def main(): args = parse_cli() - tron_start_time = time.time() + boot_time = time.time() setup_environment(args) trond = trondaemon.TronDaemon(args) - trond.run(tron_start_time=tron_start_time) + trond.run(boot_time=boot_time) if __name__ == "__main__": diff --git a/tests/mcp_reconfigure_test.py b/tests/mcp_reconfigure_test.py index b179a28ab..39fa4d3e0 100644 --- a/tests/mcp_reconfigure_test.py +++ b/tests/mcp_reconfigure_test.py @@ -1,6 +1,7 @@ """Tests for reconfiguring mcp.""" import os import tempfile +import time import pytest @@ -179,7 +180,7 @@ def _get_runs_to_schedule(self, sched): @setup def setup_mcp(self): self.test_dir = tempfile.mkdtemp() - self.mcp = mcp.MasterControlProgram(self.test_dir, "config") + self.mcp = mcp.MasterControlProgram(self.test_dir, "config", time.time()) config = {schema.MASTER_NAMESPACE: self._get_config(0, self.test_dir)} container = config_parse.ConfigContainer.create(config) self.mcp.apply_config(container) diff --git a/tests/mcp_test.py b/tests/mcp_test.py index eb8d83250..903613e51 100644 --- a/tests/mcp_test.py +++ b/tests/mcp_test.py @@ -1,5 +1,6 @@ import shutil import tempfile +import time from unittest import mock import pytest @@ -25,10 +26,8 @@ class TestMasterControlProgram: def setup_mcp(self): self.working_dir = tempfile.mkdtemp() self.config_path = tempfile.mkdtemp() - self.mcp = mcp.MasterControlProgram( - self.working_dir, - self.config_path, - ) + self.boot_time = time.time() + self.mcp = mcp.MasterControlProgram(self.working_dir, self.config_path, self.boot_time) self.mcp.state_watcher = mock.create_autospec( statemanager.StateChangeWatcher, ) @@ -49,23 +48,22 @@ def test_reconfigure_namespace(self): self.mcp._load_config.assert_called_with(reconfigure=True, namespace_to_reconfigure="foo") @pytest.mark.parametrize( - "reconfigure,namespace,tron_start_time", + "reconfigure,namespace", [ - (False, None, None), - (True, None, None), - (True, "foo", 5), + (False, None), + (True, None), + (True, "foo"), ], ) - def test_load_config(self, reconfigure, namespace, tron_start_time): + def test_load_config(self, reconfigure, namespace): autospec_method(self.mcp.apply_config) self.mcp.config = mock.create_autospec(manager.ConfigManager) - self.mcp._load_config(reconfigure, namespace, tron_start_time) + self.mcp._load_config(reconfigure, namespace) self.mcp.state_watcher.disabled.assert_called_with() self.mcp.apply_config.assert_called_with( self.mcp.config.load.return_value, reconfigure=reconfigure, namespace_to_reconfigure=namespace, - tron_start_time=tron_start_time, ) @pytest.mark.parametrize( @@ -146,10 +144,8 @@ class TestMasterControlProgramRestoreState(TestCase): def setup_mcp(self): self.working_dir = tempfile.mkdtemp() self.config_path = tempfile.mkdtemp() - self.mcp = mcp.MasterControlProgram( - self.working_dir, - self.config_path, - ) + self.boot_time = None + self.mcp = mcp.MasterControlProgram(self.working_dir, self.config_path, self.boot_time) self.mcp.jobs = mock.create_autospec(JobCollection) self.mcp.state_watcher = mock.create_autospec( statemanager.StateChangeWatcher, @@ -164,7 +160,7 @@ def teardown_mcp(self): def test_restore_state(self, mock_cluster_repo): job_state_data = {"1": "things", "2": "things"} mesos_state_data = {"3": "things", "4": "things"} - tron_start_time = None + boot_time = None state_data = { "mesos_state": mesos_state_data, "job_state": job_state_data, @@ -173,11 +169,7 @@ def test_restore_state(self, mock_cluster_repo): action_runner = mock.Mock() self.mcp.restore_state(action_runner) mock_cluster_repo.restore_state.assert_called_with(mesos_state_data) - self.mcp.jobs.restore_state.assert_called_with( - job_state_data, - action_runner, - tron_start_time, - ) + self.mcp.jobs.restore_state.assert_called_with(job_state_data, action_runner, boot_time) if __name__ == "__main__": diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 842d5b192..bce62c2f5 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -301,7 +301,6 @@ def __init__( on_upstream_rerun=None, trigger_timeout_timestamp=None, original_command=None, - start_schedule_jobs=True, ): super().__init__() self.job_run_id = maybe_decode(job_run_id) @@ -1135,8 +1134,6 @@ def handle_action_command_state_change(self, action_command, event, event_data=N class KubernetesActionRun(ActionRun, Observer): """An ActionRun that executes the command on a Kubernetes cluster.""" - start_schedule_jobs = True - def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: """ Attempt to run a given ActionRunAttempt on the configured Kubernetes cluster. @@ -1195,11 +1192,11 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: self.watch(task) try: - if self.start_schedule_jobs: + if k8s_cluster.start_schedule_jobs: log.info( f"We will start submitting tasks (i.e scheduling jobs). Starting with {task.get_kubernetes_id()}" ) - self.start_schedule_jobs = False + k8s_cluster.start_schedule_jobs = False k8s_cluster.submit(task) except Exception: log.exception(f"Unable to submit task for ActionRun {self.id}") diff --git a/tron/core/job_collection.py b/tron/core/job_collection.py index 6e926b6fa..e180337ee 100644 --- a/tron/core/job_collection.py +++ b/tron/core/job_collection.py @@ -41,8 +41,7 @@ def reconfigure_filter(config): else: return config.namespace == namespace_to_reconfigure - # The below statements don't get called until self.state_watcher.watch_all is called - # in apply_config. This will go through the job's configs and build a scheduler for them + # NOTE: as this is a generator expression, we will only go through job configs and build a scheduler for them once something iterates over us (i.e, once `self.state_watcher.watch_all()` is called) seq = (factory.build(config) for config in job_configs.values() if reconfigure_filter(config)) return map_to_job_and_schedule(filter(self.add, seq)) @@ -69,13 +68,15 @@ def update(self, new_job_scheduler): job_scheduler.schedule_reconfigured() return True - def restore_state(self, job_state_data, config_action_runner, tron_start_time=None): - # we will start looping through the jobs and their runs and restore - # state for each run in for a job and as we restore the state - # we will also schedule the next runs for that job - if tron_start_time is not None: + def restore_state(self, job_state_data, config_action_runner, boot_time=None): + """ + Loops through the jobs and their runs in order to restore + state for each run. As we restore state, we will also schedule the next + runs for each job + """ + if boot_time is not None: log.info( - f"Tron is restoring state for jobs and will start scheduling them! Time elapsed since Tron started {time.time()- tron_start_time}" + f"Tron is restoring state for jobs and will start scheduling them! Time elapsed since Tron started {time.time()- boot_time}" ) for name, state in job_state_data.items(): self.jobs[name].restore_state(state, config_action_runner) diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index d73021850..b34d23ca9 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -462,7 +462,8 @@ def get_first_queued(self, node=None): ) def get_scheduled(self): - # find the scheduled run for the job and return it + # Find the scheduled runs for the jobs and return it + # in most cases, there should just be a single run - but it's possible that a delayed job could have N scheduled runs built up return [r for r in self.runs if r.state == ActionRun.SCHEDULED] def next_run_num(self): diff --git a/tron/eventbus.py b/tron/eventbus.py index 585ba93c7..de9ad28a3 100644 --- a/tron/eventbus.py +++ b/tron/eventbus.py @@ -140,7 +140,6 @@ def _has_event(self, event_id): def sync_load_log(self): started = time.time() - # we are loading the pickle object from the "current" file with open(self.log_current, "rb") as f: self.event_log = pickle.load(f) duration = time.time() - started diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 8f07f93b3..97bedd814 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -279,12 +279,15 @@ def __init__( enabled: bool = True, default_volumes: Optional[List[ConfigVolume]] = None, pod_launch_timeout: Optional[int] = None, + start_schedule_jobs: bool = True, ): # general k8s config self.kubeconfig_path = kubeconfig_path self.enabled = enabled self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or [] self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S + # This flag will be used to indicate in logs the first task we start scheduling and running on Kubernetes + self.start_schedule_jobs = start_schedule_jobs # creating a task_proc executor has a couple steps: # * create a TaskProcessor diff --git a/tron/mcp.py b/tron/mcp.py index 48d93439f..40f93a6f2 100644 --- a/tron/mcp.py +++ b/tron/mcp.py @@ -22,7 +22,7 @@ def apply_master_configuration(mapping, master_config): def get_config_value(seq): return [getattr(master_config, item) for item in seq] - # mapping consists of different functions in MCP mapped to their config from tronfig in Master.yaml + # Map various MASTER.yaml config options to functions that will apply said options # for example, we will have MasterControlProgram.configure_eventbus function mapped to eventbus_enabled option for entry in mapping: func, args = entry[0], get_config_value(entry[1:]) @@ -32,15 +32,15 @@ def get_config_value(seq): class MasterControlProgram: """Central state object for the Tron daemon.""" - def __init__(self, working_dir, config_path): + def __init__(self, working_dir, config_path, boot_time=None): super().__init__() self.jobs = JobCollection() self.working_dir = working_dir self.config = manager.ConfigManager(config_path) self.context = command_context.CommandContext() self.state_watcher = statemanager.StateChangeWatcher() - self.boot_time = time.time() - current_time = time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()) + self.boot_time = boot_time + current_time = time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(boot_time)) log.info(f"Initialized. Tron started on {current_time}!") def shutdown(self): @@ -56,34 +56,32 @@ def reconfigure(self, namespace=None): log.exception(f"reconfigure failure: {e.__class__.__name__}: {e}") raise e - def _load_config(self, reconfigure=False, namespace_to_reconfigure=None, tron_start_time=None): + def _load_config(self, reconfigure=False, namespace_to_reconfigure=None): """Read config data and apply it.""" with self.state_watcher.disabled(): self.apply_config( self.config.load(), reconfigure=reconfigure, namespace_to_reconfigure=namespace_to_reconfigure, - tron_start_time=tron_start_time, ) - def initial_setup(self, tron_start_time=None): + def initial_setup(self): """When the MCP is initialized the config is applied before the state. In this case jobs shouldn't be scheduled until the state is applied. """ - # The job schedules have been applied in first step - self._load_config(tron_start_time=tron_start_time) - # Now the jobs will actually get scheduled once the state for action runs are restored + # The job schedule factories will be created in the function below + self._load_config() + # Jobs will also get scheduled (internally) once the state for action runs are restored in restore_state self.restore_state( actioncommand.create_action_runner_factory_from_config( self.config.load().get_master().action_runner, ), - tron_start_time=tron_start_time, ) # Any job with existing state would have been scheduled already. Jobs # without any state will be scheduled here. self.jobs.run_queue_schedule() - def apply_config(self, config_container, reconfigure=False, namespace_to_reconfigure=None, tron_start_time=None): + def apply_config(self, config_container, reconfigure=False, namespace_to_reconfigure=None): """Apply a configuration.""" master_config_directives = [ (self.update_state_watcher_config, "state_persistence"), @@ -110,7 +108,7 @@ def apply_config(self, config_container, reconfigure=False, namespace_to_reconfi # TODO: unify NOTIFY_STATE_CHANGE and simplify this self.job_graph = JobGraph(config_container) - # Builds a job scheduler factory for Tron jobs, which will schedule internally in Tron the jobs + # This factory is how Tron internally manages scheduling jobs factory = self.build_job_scheduler_factory(master_config, self.job_graph) updated_jobs = self.jobs.update_from_config( config_container.get_jobs(), @@ -119,14 +117,13 @@ def apply_config(self, config_container, reconfigure=False, namespace_to_reconfi namespace_to_reconfigure, ) # We will build the schedulers once the watcher is invoked - if tron_start_time is not None: - log.info( - f"Tron built the schedulers for Tron jobs internally! Time elapsed since Tron started {time.time() - tron_start_time}s" - ) + log.info( + f"Tron built the schedulers for Tron jobs internally! Time elapsed since Tron started {time.time() - self.boot_time}s" + ) self.state_watcher.watch_all(updated_jobs, [Job.NOTIFY_STATE_CHANGE, Job.NOTIFY_NEW_RUN]) def build_job_scheduler_factory(self, master_config, job_graph): - """Creates an action runner if option specified in tronfigs and job scheduler instances for Tron jobs""" + """Creates JobSchedulerFactory, which are how Tron tracks job schedules internally""" output_stream_dir = master_config.output_stream_dir or self.working_dir action_runner = actioncommand.create_action_runner_factory_from_config( master_config.action_runner, @@ -164,15 +161,15 @@ def get_job_collection(self): def get_config_manager(self): return self.config - def restore_state(self, action_runner, tron_start_time=None): + def restore_state(self, action_runner): """Use the state manager to retrieve to persisted state and apply it to the configured Jobs. """ - log.info("restoring from DynamoDB") + log.info("Restoring from DynamoDB") states = self.state_watcher.restore(self.jobs.get_names()) MesosClusterRepository.restore_state(states.get("mesos_state", {})) - self.jobs.restore_state(states.get("job_state", {}), action_runner, tron_start_time=tron_start_time) + self.jobs.restore_state(states.get("job_state", {}), action_runner, boot_time=self.boot_time) self.state_watcher.save_metadata() def __str__(self): diff --git a/tron/serialize/runstate/statemanager.py b/tron/serialize/runstate/statemanager.py index 53af32594..a375cbe9a 100644 --- a/tron/serialize/runstate/statemanager.py +++ b/tron/serialize/runstate/statemanager.py @@ -2,6 +2,7 @@ import logging import time from contextlib import contextmanager +from typing import Dict from tron.config import schema from tron.core import job @@ -147,7 +148,7 @@ def restore(self, job_names, skip_validation=False): jobs = self._restore_dicts(runstate.JOB_STATE, job_names) # jobs should be a dictionary that contains job name and number of runs - # {'Master.k8s': {'run_nums':[0], 'enabled': True}, 'Master.cits_test_frequent_1': {'run_nums'; [1,0], 'enabled': True}} + # {'MASTER.k8s': {'run_nums':[0], 'enabled': True}, 'MASTER.cits_test_frequent_1': {'run_nums': [1,0], 'enabled': True}} for job_name, job_state in jobs.items(): job_state["runs"] = self._restore_runs_for_job(job_name, job_state) frameworks = self._restore_dicts(runstate.MESOS_STATE, ["frameworks"]) @@ -179,7 +180,7 @@ def _keys_for_items(self, item_type, names): keys = (self._impl.build_key(item_type, name) for name in names) return dict(zip(keys, names)) - def _restore_dicts(self, item_type, items): + def _restore_dicts(self, item_type, items) -> Dict[str, dict]: """Return a dict mapping of the items name to its state data.""" key_to_item_map = self._keys_for_items(item_type, items) key_to_state_map = self._impl.restore(key_to_item_map.keys()) @@ -265,7 +266,7 @@ def update_from_config(self, state_config): return False self.shutdown() - # The function below will spin up a thread that will be saving into dynamodb, which will run as daemon + # NOTE: this will spin up a thread that will constantly persist data into dynamodb self.state_manager = PersistenceManagerFactory.from_config( state_config, ) diff --git a/tron/trondaemon.py b/tron/trondaemon.py index 175c29450..a927dab01 100644 --- a/tron/trondaemon.py +++ b/tron/trondaemon.py @@ -75,7 +75,7 @@ def __init__(self, options): self.signals = {signal.SIGINT: signal.default_int_handler} self.manhole_sock = f"{self.options.working_dir}/manhole.sock" - def run(self, tron_start_time=None): + def run(self, boot_time=None): with no_daemon_context(self.working_dir, self.lock_file, self.signals): signal_map = { signal.SIGHUP: self._handle_reconfigure, @@ -85,23 +85,20 @@ def run(self, tron_start_time=None): signal.SIGUSR1: self._handle_debug, } signal.pthread_sigmask(signal.SIG_BLOCK, signal_map.keys()) - log.info("Starting the setup process of mcp, api, manhole and runnning the reactor") - self._run_mcp(tron_start_time=tron_start_time) + log.info("Starting setup processes...") + self._run_mcp(boot_time=boot_time) log.info( - f"The master control plane has finished setup. Time elapsed since Tron started {time.time() - tron_start_time}s" + f"Master Control Program (MCP) setup complete. Time elapsed since Tron started: {time.time() - boot_time}s" ) self._run_www_api() - log.info( - f"The Tron API has finished setup. Time elapsed since Tron started {time.time() - tron_start_time}s" - ) + log.info(f"Tron API setup complete. Time elapsed since Tron started: {time.time() - boot_time}s") self._run_manhole() - log.info( - f"The manhole has finished setup. Time elapsed since Tron started {time.time() - tron_start_time}s" - ) + log.info(f"Manhole setup complete. Time elapsed since Tron started: {time.time() - boot_time}s") self._run_reactor() log.info( - f"Twisted reactor has started. The Tron API should be up and ready now to receive requests. Time elapsed since Tron started {time.time() - tron_start_time}s" + f"Twisted reactor has started. The Tron API should be up and ready now to receive requests. Time elapsed since Tron started: {time.time() - boot_time}s" ) + log.info("Setup complete!") while True: signum = signal.sigwait(list(signal_map.keys())) @@ -129,16 +126,16 @@ def _run_www_api(self): port = self.options.listen_port reactor.listenTCP(port, site, interface=self.options.listen_host) - def _run_mcp(self, tron_start_time=None): + def _run_mcp(self, boot_time=None): # Local import required because of reactor import in mcp from tron import mcp working_dir = self.options.working_dir config_path = self.options.config_path - self.mcp = mcp.MasterControlProgram(working_dir, config_path) + self.mcp = mcp.MasterControlProgram(working_dir, config_path, boot_time) try: - self.mcp.initial_setup(tron_start_time=tron_start_time) + self.mcp.initial_setup() except Exception as e: msg = "Error in configuration %s: %s" log.exception(msg % (config_path, e)) @@ -146,7 +143,7 @@ def _run_mcp(self, tron_start_time=None): def _run_reactor(self): """Run the twisted reactor.""" - # Starts Tron server + # This is what actually starts the Tron server by starting the Twisted event loop threading.Thread( target=reactor.run, daemon=True, From be8dd2f8d5d79530b12793065f468ec0369cf863 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Fri, 5 Apr 2024 09:18:58 -0700 Subject: [PATCH 3/6] Address reviews 2.0 --- tests/mcp_test.py | 5 ++--- tron/core/actionrun.py | 2 +- tron/core/job_collection.py | 7 +------ tron/kubernetes.py | 2 +- tron/mcp.py | 9 +++++++-- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/tests/mcp_test.py b/tests/mcp_test.py index 903613e51..2e5965173 100644 --- a/tests/mcp_test.py +++ b/tests/mcp_test.py @@ -144,7 +144,7 @@ class TestMasterControlProgramRestoreState(TestCase): def setup_mcp(self): self.working_dir = tempfile.mkdtemp() self.config_path = tempfile.mkdtemp() - self.boot_time = None + self.boot_time = time.time() self.mcp = mcp.MasterControlProgram(self.working_dir, self.config_path, self.boot_time) self.mcp.jobs = mock.create_autospec(JobCollection) self.mcp.state_watcher = mock.create_autospec( @@ -160,7 +160,6 @@ def teardown_mcp(self): def test_restore_state(self, mock_cluster_repo): job_state_data = {"1": "things", "2": "things"} mesos_state_data = {"3": "things", "4": "things"} - boot_time = None state_data = { "mesos_state": mesos_state_data, "job_state": job_state_data, @@ -169,7 +168,7 @@ def test_restore_state(self, mock_cluster_repo): action_runner = mock.Mock() self.mcp.restore_state(action_runner) mock_cluster_repo.restore_state.assert_called_with(mesos_state_data) - self.mcp.jobs.restore_state.assert_called_with(job_state_data, action_runner, boot_time) + self.mcp.jobs.restore_state.assert_called_with(job_state_data, action_runner) if __name__ == "__main__": diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index bce62c2f5..63560fb57 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -1194,7 +1194,7 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: try: if k8s_cluster.start_schedule_jobs: log.info( - f"We will start submitting tasks (i.e scheduling jobs). Starting with {task.get_kubernetes_id()}" + f"We will start submitting tasks (i.e running the jobs). Starting with {task.get_kubernetes_id()}" ) k8s_cluster.start_schedule_jobs = False k8s_cluster.submit(task) diff --git a/tron/core/job_collection.py b/tron/core/job_collection.py index e180337ee..7c066f450 100644 --- a/tron/core/job_collection.py +++ b/tron/core/job_collection.py @@ -1,5 +1,4 @@ import logging -import time from tron.core.job import Job from tron.utils import collections @@ -68,16 +67,12 @@ def update(self, new_job_scheduler): job_scheduler.schedule_reconfigured() return True - def restore_state(self, job_state_data, config_action_runner, boot_time=None): + def restore_state(self, job_state_data, config_action_runner): """ Loops through the jobs and their runs in order to restore state for each run. As we restore state, we will also schedule the next runs for each job """ - if boot_time is not None: - log.info( - f"Tron is restoring state for jobs and will start scheduling them! Time elapsed since Tron started {time.time()- boot_time}" - ) for name, state in job_state_data.items(): self.jobs[name].restore_state(state, config_action_runner) log.info(f"Loaded state for {len(job_state_data)} jobs") diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 97bedd814..99e307565 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -288,7 +288,6 @@ def __init__( self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S # This flag will be used to indicate in logs the first task we start scheduling and running on Kubernetes self.start_schedule_jobs = start_schedule_jobs - # creating a task_proc executor has a couple steps: # * create a TaskProcessor # * load the desired plugin (in this case, the k8s one) @@ -312,6 +311,7 @@ def __init__( # actually create the executor/runner, as mentioned above. self.connect() + log.info("Tron connected to task_proc. task_proc will start scheduling now the jobs on k8s") def connect(self) -> None: """ diff --git a/tron/mcp.py b/tron/mcp.py index 40f93a6f2..9d5d2a18f 100644 --- a/tron/mcp.py +++ b/tron/mcp.py @@ -168,8 +168,13 @@ def restore_state(self, action_runner): log.info("Restoring from DynamoDB") states = self.state_watcher.restore(self.jobs.get_names()) MesosClusterRepository.restore_state(states.get("mesos_state", {})) - - self.jobs.restore_state(states.get("job_state", {}), action_runner, boot_time=self.boot_time) + log.info( + f"Tron will start restoring state for the jobs and will start scheduling them! Time elapsed since Tron started {time.time()- self.boot_time}" + ) + self.jobs.restore_state(states.get("job_state", {}), action_runner) + log.info( + f"Tron completed restoring state for the jobs. Time elapsed since Tron started {time.time()- self.boot_time}" + ) self.state_watcher.save_metadata() def __str__(self): From 17df30ca6ba36fc4541bee5d895732088c1b8cdd Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Fri, 5 Apr 2024 11:51:38 -0700 Subject: [PATCH 4/6] Deleting the start_schedule_jobs flag --- tron/core/actionrun.py | 5 ----- tron/kubernetes.py | 3 --- 2 files changed, 8 deletions(-) diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 63560fb57..27d2d90f7 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -1192,11 +1192,6 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: self.watch(task) try: - if k8s_cluster.start_schedule_jobs: - log.info( - f"We will start submitting tasks (i.e running the jobs). Starting with {task.get_kubernetes_id()}" - ) - k8s_cluster.start_schedule_jobs = False k8s_cluster.submit(task) except Exception: log.exception(f"Unable to submit task for ActionRun {self.id}") diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 99e307565..ff2a0ed69 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -279,15 +279,12 @@ def __init__( enabled: bool = True, default_volumes: Optional[List[ConfigVolume]] = None, pod_launch_timeout: Optional[int] = None, - start_schedule_jobs: bool = True, ): # general k8s config self.kubeconfig_path = kubeconfig_path self.enabled = enabled self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or [] self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S - # This flag will be used to indicate in logs the first task we start scheduling and running on Kubernetes - self.start_schedule_jobs = start_schedule_jobs # creating a task_proc executor has a couple steps: # * create a TaskProcessor # * load the desired plugin (in this case, the k8s one) From bc3ad87a3b30534d5f40c16d555b4f6e0c1ffaf3 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Fri, 5 Apr 2024 14:01:45 -0700 Subject: [PATCH 5/6] Making boot_time a required param --- bin/trond | 2 +- tests/trondaemon_test.py | 4 +++- tron/mcp.py | 6 +++--- tron/serialize/runstate/dynamodb_state_store.py | 2 +- tron/trondaemon.py | 2 +- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/bin/trond b/bin/trond index 8d505e367..c6fc9547c 100755 --- a/bin/trond +++ b/bin/trond @@ -172,7 +172,7 @@ def main(): boot_time = time.time() setup_environment(args) trond = trondaemon.TronDaemon(args) - trond.run(boot_time=boot_time) + trond.run(boot_time) if __name__ == "__main__": diff --git a/tests/trondaemon_test.py b/tests/trondaemon_test.py index 80097faaa..026dd99cd 100644 --- a/tests/trondaemon_test.py +++ b/tests/trondaemon_test.py @@ -1,5 +1,6 @@ import os import tempfile +import time from unittest import mock from testifycompat import setup @@ -43,10 +44,11 @@ def test_run_uses_context(self): autospec=None, ) as ndc: ndc.return_value = mock.MagicMock() + boot_time = time.time() ndc.return_value.__enter__.side_effect = RuntimeError() daemon = TronDaemon(mock.Mock()) try: - daemon.run() + daemon.run(boot_time) except Exception: pass assert ndc.call_count == 1 diff --git a/tron/mcp.py b/tron/mcp.py index 9d5d2a18f..de56390b0 100644 --- a/tron/mcp.py +++ b/tron/mcp.py @@ -32,7 +32,7 @@ def get_config_value(seq): class MasterControlProgram: """Central state object for the Tron daemon.""" - def __init__(self, working_dir, config_path, boot_time=None): + def __init__(self, working_dir, config_path, boot_time): super().__init__() self.jobs = JobCollection() self.working_dir = working_dir @@ -169,11 +169,11 @@ def restore_state(self, action_runner): states = self.state_watcher.restore(self.jobs.get_names()) MesosClusterRepository.restore_state(states.get("mesos_state", {})) log.info( - f"Tron will start restoring state for the jobs and will start scheduling them! Time elapsed since Tron started {time.time()- self.boot_time}" + f"Tron will start restoring state for the jobs and will start scheduling them! Time elapsed since Tron started {time.time() - self.boot_time}" ) self.jobs.restore_state(states.get("job_state", {}), action_runner) log.info( - f"Tron completed restoring state for the jobs. Time elapsed since Tron started {time.time()- self.boot_time}" + f"Tron completed restoring state for the jobs. Time elapsed since Tron started {time.time() - self.boot_time}" ) self.state_watcher.save_metadata() diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 57c5363ea..82c64167e 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -136,7 +136,7 @@ def _consume_save_queue(self): log.error(error) with self.save_lock: self.save_queue[key] = val - duration = time.time() - start + duration = start log.info(f"saved {saved} items in {duration}s") if saved < qlen: diff --git a/tron/trondaemon.py b/tron/trondaemon.py index a927dab01..0679580de 100644 --- a/tron/trondaemon.py +++ b/tron/trondaemon.py @@ -75,7 +75,7 @@ def __init__(self, options): self.signals = {signal.SIGINT: signal.default_int_handler} self.manhole_sock = f"{self.options.working_dir}/manhole.sock" - def run(self, boot_time=None): + def run(self, boot_time): with no_daemon_context(self.working_dir, self.lock_file, self.signals): signal_map = { signal.SIGHUP: self._handle_reconfigure, From e867044e2ef445f50cc45b97529b03be5c1eb957 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Mon, 8 Apr 2024 11:00:01 -0700 Subject: [PATCH 6/6] bring back time.time in duration --- tron/serialize/runstate/dynamodb_state_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 82c64167e..57c5363ea 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -136,7 +136,7 @@ def _consume_save_queue(self): log.error(error) with self.save_lock: self.save_queue[key] = val - duration = start + duration = time.time() - start log.info(f"saved {saved} items in {duration}s") if saved < qlen: