Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding logs to Tron to indicate start and scheduling times - TRON-2152 #948

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bin/trond
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import argparse
import faulthandler
import logging
import os
import time
import traceback

import pkg_resources
Expand Down Expand Up @@ -168,10 +169,10 @@ def setup_environment(args):

def main():
args = parse_cli()

boot_time = time.time()
setup_environment(args)
trond = trondaemon.TronDaemon(args)
trond.run()
trond.run(boot_time)


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion tests/mcp_reconfigure_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for reconfiguring mcp."""
import os
import tempfile
import time

import pytest

Expand Down Expand Up @@ -179,7 +180,7 @@ def _get_runs_to_schedule(self, sched):
@setup
def setup_mcp(self):
self.test_dir = tempfile.mkdtemp()
self.mcp = mcp.MasterControlProgram(self.test_dir, "config")
self.mcp = mcp.MasterControlProgram(self.test_dir, "config", time.time())
config = {schema.MASTER_NAMESPACE: self._get_config(0, self.test_dir)}
container = config_parse.ConfigContainer.create(config)
self.mcp.apply_config(container)
Expand Down
18 changes: 6 additions & 12 deletions tests/mcp_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import shutil
import tempfile
import time
from unittest import mock

import pytest
Expand All @@ -25,10 +26,8 @@ class TestMasterControlProgram:
def setup_mcp(self):
self.working_dir = tempfile.mkdtemp()
self.config_path = tempfile.mkdtemp()
self.mcp = mcp.MasterControlProgram(
self.working_dir,
self.config_path,
)
self.boot_time = time.time()
self.mcp = mcp.MasterControlProgram(self.working_dir, self.config_path, self.boot_time)
self.mcp.state_watcher = mock.create_autospec(
statemanager.StateChangeWatcher,
)
Expand Down Expand Up @@ -145,10 +144,8 @@ class TestMasterControlProgramRestoreState(TestCase):
def setup_mcp(self):
self.working_dir = tempfile.mkdtemp()
self.config_path = tempfile.mkdtemp()
self.mcp = mcp.MasterControlProgram(
self.working_dir,
self.config_path,
)
self.boot_time = time.time()
self.mcp = mcp.MasterControlProgram(self.working_dir, self.config_path, self.boot_time)
self.mcp.jobs = mock.create_autospec(JobCollection)
self.mcp.state_watcher = mock.create_autospec(
statemanager.StateChangeWatcher,
Expand All @@ -171,10 +168,7 @@ def test_restore_state(self, mock_cluster_repo):
action_runner = mock.Mock()
self.mcp.restore_state(action_runner)
mock_cluster_repo.restore_state.assert_called_with(mesos_state_data)
self.mcp.jobs.restore_state.assert_called_with(
job_state_data,
action_runner,
)
self.mcp.jobs.restore_state.assert_called_with(job_state_data, action_runner)


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion tests/trondaemon_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import tempfile
import time
from unittest import mock

from testifycompat import setup
Expand Down Expand Up @@ -43,10 +44,11 @@ def test_run_uses_context(self):
autospec=None,
) as ndc:
ndc.return_value = mock.MagicMock()
boot_time = time.time()
ndc.return_value.__enter__.side_effect = RuntimeError()
daemon = TronDaemon(mock.Mock())
try:
daemon.run()
daemon.run(boot_time)
except Exception:
pass
assert ndc.call_count == 1
Expand Down
6 changes: 6 additions & 0 deletions tron/core/job_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def reconfigure_filter(config):
else:
return config.namespace == namespace_to_reconfigure

# NOTE: as this is a generator expression, we will only go through job configs and build a scheduler for them once something iterates over us (i.e, once `self.state_watcher.watch_all()` is called)
seq = (factory.build(config) for config in job_configs.values() if reconfigure_filter(config))
return map_to_job_and_schedule(filter(self.add, seq))

Expand Down Expand Up @@ -67,6 +68,11 @@ def update(self, new_job_scheduler):
return True

def restore_state(self, job_state_data, config_action_runner):
"""
Loops through the jobs and their runs in order to restore
state for each run. As we restore state, we will also schedule the next
runs for each job
"""
for name, state in job_state_data.items():
self.jobs[name].restore_state(state, config_action_runner)
log.info(f"Loaded state for {len(job_state_data)} jobs")
Expand Down
2 changes: 1 addition & 1 deletion tron/core/job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def __init__(self, context, output_stream_dir, time_zone, action_runner, job_gra
self.job_graph = job_graph

def build(self, job_config):
log.debug(f"Building new job {job_config.name}")
log.debug(f"Building new job scheduler {job_config.name}")
output_path = filehandler.OutputPath(self.output_stream_dir)
time_zone = job_config.time_zone or self.time_zone
scheduler = scheduler_from_config(job_config.schedule, time_zone)
Expand Down
2 changes: 2 additions & 0 deletions tron/core/jobrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ def get_first_queued(self, node=None):
)

def get_scheduled(self):
# Find the scheduled runs for the jobs and return it
# in most cases, there should just be a single run - but it's possible that a delayed job could have N scheduled runs built up
return [r for r in self.runs if r.state == ActionRun.SCHEDULED]

def next_run_num(self):
Expand Down
2 changes: 1 addition & 1 deletion tron/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ def __init__(
self.enabled = enabled
self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or []
self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S

# creating a task_proc executor has a couple steps:
# * create a TaskProcessor
# * load the desired plugin (in this case, the k8s one)
Expand All @@ -309,6 +308,7 @@ def __init__(

# actually create the executor/runner, as mentioned above.
self.connect()
log.info("Tron connected to task_proc. task_proc will start scheduling now the jobs on k8s")

def connect(self) -> None:
"""
Expand Down
26 changes: 21 additions & 5 deletions tron/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def apply_master_configuration(mapping, master_config):
def get_config_value(seq):
return [getattr(master_config, item) for item in seq]

# Map various MASTER.yaml config options to functions that will apply said options
# for example, we will have MasterControlProgram.configure_eventbus function mapped to eventbus_enabled option
for entry in mapping:
func, args = entry[0], get_config_value(entry[1:])
func(*args)
Expand All @@ -30,15 +32,16 @@ def get_config_value(seq):
class MasterControlProgram:
"""Central state object for the Tron daemon."""

def __init__(self, working_dir, config_path):
def __init__(self, working_dir, config_path, boot_time):
super().__init__()
self.jobs = JobCollection()
self.working_dir = working_dir
self.config = manager.ConfigManager(config_path)
self.context = command_context.CommandContext()
self.state_watcher = statemanager.StateChangeWatcher()
self.boot_time = time.time()
log.info("initialized")
self.boot_time = boot_time
current_time = time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(boot_time))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just double-checking: is time.localtime(None) valid?

Copy link
Contributor Author

@EmanElsaban EmanElsaban Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, it prints

2024-04-05 05:03:11,461 tron.trondaemon INFO Starting setup processes...
2024-04-05 05:03:19,723 tron.mcp INFO Initialized. Tron started on Fri, 05 Apr 2024 05:03:16!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we don't pass anything to localtime() or pass None , it converts the current time instead

log.info(f"Initialized. Tron started on {current_time}!")

def shutdown(self):
EventBus.shutdown()
Expand Down Expand Up @@ -66,7 +69,9 @@ def initial_setup(self):
"""When the MCP is initialized the config is applied before the state.
In this case jobs shouldn't be scheduled until the state is applied.
"""
# The job schedule factories will be created in the function below
self._load_config()
# Jobs will also get scheduled (internally) once the state for action runs are restored in restore_state
self.restore_state(
actioncommand.create_action_runner_factory_from_config(
self.config.load().get_master().action_runner,
Expand Down Expand Up @@ -103,16 +108,22 @@ def apply_config(self, config_container, reconfigure=False, namespace_to_reconfi

# TODO: unify NOTIFY_STATE_CHANGE and simplify this
self.job_graph = JobGraph(config_container)
# This factory is how Tron internally manages scheduling jobs
factory = self.build_job_scheduler_factory(master_config, self.job_graph)
updated_jobs = self.jobs.update_from_config(
config_container.get_jobs(),
factory,
reconfigure,
namespace_to_reconfigure,
)
# We will build the schedulers once the watcher is invoked
log.info(
f"Tron built the schedulers for Tron jobs internally! Time elapsed since Tron started {time.time() - self.boot_time}s"
)
self.state_watcher.watch_all(updated_jobs, [Job.NOTIFY_STATE_CHANGE, Job.NOTIFY_NEW_RUN])

def build_job_scheduler_factory(self, master_config, job_graph):
"""Creates JobSchedulerFactory, which are how Tron tracks job schedules internally"""
output_stream_dir = master_config.output_stream_dir or self.working_dir
action_runner = actioncommand.create_action_runner_factory_from_config(
master_config.action_runner,
Expand Down Expand Up @@ -154,11 +165,16 @@ def restore_state(self, action_runner):
"""Use the state manager to retrieve to persisted state and apply it
to the configured Jobs.
"""
log.info("restoring")
log.info("Restoring from DynamoDB")
states = self.state_watcher.restore(self.jobs.get_names())
MesosClusterRepository.restore_state(states.get("mesos_state", {}))

log.info(
f"Tron will start restoring state for the jobs and will start scheduling them! Time elapsed since Tron started {time.time() - self.boot_time}"
)
self.jobs.restore_state(states.get("job_state", {}), action_runner)
log.info(
f"Tron completed restoring state for the jobs. Time elapsed since Tron started {time.time() - self.boot_time}"
)
self.state_watcher.save_metadata()

def __str__(self):
Expand Down
2 changes: 1 addition & 1 deletion tron/serialize/runstate/dynamodb_state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _consume_save_queue(self):
log.error(error)
with self.save_lock:
self.save_queue[key] = val
duration = time.time() - start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nopee, will commit the past version

duration = start
log.info(f"saved {saved} items in {duration}s")

if saved < qlen:
Expand Down
6 changes: 5 additions & 1 deletion tron/serialize/runstate/statemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import time
from contextlib import contextmanager
from typing import Dict

from tron.config import schema
from tron.core import job
Expand Down Expand Up @@ -146,6 +147,8 @@ def restore(self, job_names, skip_validation=False):
self._restore_metadata()

jobs = self._restore_dicts(runstate.JOB_STATE, job_names)
# jobs should be a dictionary that contains job name and number of runs
# {'MASTER.k8s': {'run_nums':[0], 'enabled': True}, 'MASTER.cits_test_frequent_1': {'run_nums': [1,0], 'enabled': True}}
for job_name, job_state in jobs.items():
job_state["runs"] = self._restore_runs_for_job(job_name, job_state)
frameworks = self._restore_dicts(runstate.MESOS_STATE, ["frameworks"])
Expand Down Expand Up @@ -177,7 +180,7 @@ def _keys_for_items(self, item_type, names):
keys = (self._impl.build_key(item_type, name) for name in names)
return dict(zip(keys, names))

def _restore_dicts(self, item_type, items):
def _restore_dicts(self, item_type, items) -> Dict[str, dict]:
"""Return a dict mapping of the items name to its state data."""
key_to_item_map = self._keys_for_items(item_type, items)
key_to_state_map = self._impl.restore(key_to_item_map.keys())
Expand Down Expand Up @@ -263,6 +266,7 @@ def update_from_config(self, state_config):
return False

self.shutdown()
# NOTE: this will spin up a thread that will constantly persist data into dynamodb
self.state_manager = PersistenceManagerFactory.from_config(
state_config,
)
Expand Down
20 changes: 15 additions & 5 deletions tron/trondaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self, options):
self.signals = {signal.SIGINT: signal.default_int_handler}
self.manhole_sock = f"{self.options.working_dir}/manhole.sock"

def run(self):
def run(self, boot_time):
with no_daemon_context(self.working_dir, self.lock_file, self.signals):
signal_map = {
signal.SIGHUP: self._handle_reconfigure,
Expand All @@ -85,11 +85,20 @@ def run(self):
signal.SIGUSR1: self._handle_debug,
}
signal.pthread_sigmask(signal.SIG_BLOCK, signal_map.keys())

self._run_mcp()
log.info("Starting setup processes...")
self._run_mcp(boot_time=boot_time)
log.info(
f"Master Control Program (MCP) setup complete. Time elapsed since Tron started: {time.time() - boot_time}s"
)
self._run_www_api()
log.info(f"Tron API setup complete. Time elapsed since Tron started: {time.time() - boot_time}s")
self._run_manhole()
log.info(f"Manhole setup complete. Time elapsed since Tron started: {time.time() - boot_time}s")
self._run_reactor()
log.info(
f"Twisted reactor has started. The Tron API should be up and ready now to receive requests. Time elapsed since Tron started: {time.time() - boot_time}s"
)
log.info("Setup complete!")

while True:
signum = signal.sigwait(list(signal_map.keys()))
Expand Down Expand Up @@ -117,13 +126,13 @@ def _run_www_api(self):
port = self.options.listen_port
reactor.listenTCP(port, site, interface=self.options.listen_host)

def _run_mcp(self):
def _run_mcp(self, boot_time=None):
# Local import required because of reactor import in mcp
from tron import mcp

working_dir = self.options.working_dir
config_path = self.options.config_path
self.mcp = mcp.MasterControlProgram(working_dir, config_path)
self.mcp = mcp.MasterControlProgram(working_dir, config_path, boot_time)

try:
self.mcp.initial_setup()
Expand All @@ -134,6 +143,7 @@ def _run_mcp(self):

def _run_reactor(self):
"""Run the twisted reactor."""
# This is what actually starts the Tron server by starting the Twisted event loop
threading.Thread(
target=reactor.run,
daemon=True,
Expand Down
Loading