-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding logs to Tron to indicate start and scheduling times - TRON-2152 #948
Changes from 4 commits
0999dbe
b427e14
be8dd2f
17df30c
bc3ad87
e867044
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -22,6 +22,8 @@ def apply_master_configuration(mapping, master_config): | |||||
def get_config_value(seq): | ||||||
return [getattr(master_config, item) for item in seq] | ||||||
|
||||||
# Map various MASTER.yaml config options to functions that will apply said options | ||||||
# for example, we will have MasterControlProgram.configure_eventbus function mapped to eventbus_enabled option | ||||||
for entry in mapping: | ||||||
func, args = entry[0], get_config_value(entry[1:]) | ||||||
func(*args) | ||||||
|
@@ -30,15 +32,16 @@ def get_config_value(seq): | |||||
class MasterControlProgram: | ||||||
"""Central state object for the Tron daemon.""" | ||||||
|
||||||
def __init__(self, working_dir, config_path): | ||||||
def __init__(self, working_dir, config_path, boot_time=None): | ||||||
super().__init__() | ||||||
self.jobs = JobCollection() | ||||||
self.working_dir = working_dir | ||||||
self.config = manager.ConfigManager(config_path) | ||||||
self.context = command_context.CommandContext() | ||||||
self.state_watcher = statemanager.StateChangeWatcher() | ||||||
self.boot_time = time.time() | ||||||
log.info("initialized") | ||||||
self.boot_time = boot_time | ||||||
current_time = time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(boot_time)) | ||||||
log.info(f"Initialized. Tron started on {current_time}!") | ||||||
|
||||||
def shutdown(self): | ||||||
EventBus.shutdown() | ||||||
|
@@ -66,7 +69,9 @@ def initial_setup(self): | |||||
"""When the MCP is initialized the config is applied before the state. | ||||||
In this case jobs shouldn't be scheduled until the state is applied. | ||||||
""" | ||||||
# The job schedule factories will be created in the function below | ||||||
self._load_config() | ||||||
# Jobs will also get scheduled (internally) once the state for action runs are restored in restore_state | ||||||
self.restore_state( | ||||||
actioncommand.create_action_runner_factory_from_config( | ||||||
self.config.load().get_master().action_runner, | ||||||
|
@@ -103,16 +108,22 @@ def apply_config(self, config_container, reconfigure=False, namespace_to_reconfi | |||||
|
||||||
# TODO: unify NOTIFY_STATE_CHANGE and simplify this | ||||||
self.job_graph = JobGraph(config_container) | ||||||
# This factory is how Tron internally manages scheduling jobs | ||||||
factory = self.build_job_scheduler_factory(master_config, self.job_graph) | ||||||
updated_jobs = self.jobs.update_from_config( | ||||||
config_container.get_jobs(), | ||||||
factory, | ||||||
reconfigure, | ||||||
namespace_to_reconfigure, | ||||||
) | ||||||
# We will build the schedulers once the watcher is invoked | ||||||
log.info( | ||||||
f"Tron built the schedulers for Tron jobs internally! Time elapsed since Tron started {time.time() - self.boot_time}s" | ||||||
) | ||||||
self.state_watcher.watch_all(updated_jobs, [Job.NOTIFY_STATE_CHANGE, Job.NOTIFY_NEW_RUN]) | ||||||
|
||||||
def build_job_scheduler_factory(self, master_config, job_graph): | ||||||
"""Creates JobSchedulerFactory, which are how Tron tracks job schedules internally""" | ||||||
output_stream_dir = master_config.output_stream_dir or self.working_dir | ||||||
action_runner = actioncommand.create_action_runner_factory_from_config( | ||||||
master_config.action_runner, | ||||||
|
@@ -154,11 +165,16 @@ def restore_state(self, action_runner): | |||||
"""Use the state manager to retrieve to persisted state and apply it | ||||||
to the configured Jobs. | ||||||
""" | ||||||
log.info("restoring") | ||||||
log.info("Restoring from DynamoDB") | ||||||
states = self.state_watcher.restore(self.jobs.get_names()) | ||||||
MesosClusterRepository.restore_state(states.get("mesos_state", {})) | ||||||
|
||||||
log.info( | ||||||
f"Tron will start restoring state for the jobs and will start scheduling them! Time elapsed since Tron started {time.time()- self.boot_time}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. huh, i just moved this to my landscape-oriented monitor and noticed that
Suggested change
|
||||||
) | ||||||
self.jobs.restore_state(states.get("job_state", {}), action_runner) | ||||||
log.info( | ||||||
f"Tron completed restoring state for the jobs. Time elapsed since Tron started {time.time()- self.boot_time}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same nit as above
Suggested change
|
||||||
) | ||||||
self.state_watcher.save_metadata() | ||||||
|
||||||
def __str__(self): | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,7 +75,7 @@ def __init__(self, options): | |
self.signals = {signal.SIGINT: signal.default_int_handler} | ||
self.manhole_sock = f"{self.options.working_dir}/manhole.sock" | ||
|
||
def run(self): | ||
def run(self, boot_time=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (chatting about this internally: but on closer inspection, i think we want to make sure that this is set or add some guard clauses to the log statements below - otherwise, we're leaving a bit of a footgun around) |
||
with no_daemon_context(self.working_dir, self.lock_file, self.signals): | ||
signal_map = { | ||
signal.SIGHUP: self._handle_reconfigure, | ||
|
@@ -85,11 +85,20 @@ def run(self): | |
signal.SIGUSR1: self._handle_debug, | ||
} | ||
signal.pthread_sigmask(signal.SIG_BLOCK, signal_map.keys()) | ||
|
||
self._run_mcp() | ||
log.info("Starting setup processes...") | ||
self._run_mcp(boot_time=boot_time) | ||
log.info( | ||
f"Master Control Program (MCP) setup complete. Time elapsed since Tron started: {time.time() - boot_time}s" | ||
) | ||
self._run_www_api() | ||
log.info(f"Tron API setup complete. Time elapsed since Tron started: {time.time() - boot_time}s") | ||
self._run_manhole() | ||
log.info(f"Manhole setup complete. Time elapsed since Tron started: {time.time() - boot_time}s") | ||
self._run_reactor() | ||
log.info( | ||
f"Twisted reactor has started. The Tron API should be up and ready now to receive requests. Time elapsed since Tron started: {time.time() - boot_time}s" | ||
) | ||
log.info("Setup complete!") | ||
|
||
while True: | ||
signum = signal.sigwait(list(signal_map.keys())) | ||
|
@@ -117,13 +126,13 @@ def _run_www_api(self): | |
port = self.options.listen_port | ||
reactor.listenTCP(port, site, interface=self.options.listen_host) | ||
|
||
def _run_mcp(self): | ||
def _run_mcp(self, boot_time=None): | ||
# Local import required because of reactor import in mcp | ||
from tron import mcp | ||
|
||
working_dir = self.options.working_dir | ||
config_path = self.options.config_path | ||
self.mcp = mcp.MasterControlProgram(working_dir, config_path) | ||
self.mcp = mcp.MasterControlProgram(working_dir, config_path, boot_time) | ||
|
||
try: | ||
self.mcp.initial_setup() | ||
|
@@ -134,6 +143,7 @@ def _run_mcp(self): | |
|
||
def _run_reactor(self): | ||
"""Run the twisted reactor.""" | ||
# This is what actually starts the Tron server by starting the Twisted event loop | ||
threading.Thread( | ||
target=reactor.run, | ||
daemon=True, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just double-checking: is
time.localtime(None)
valid?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, it prints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we don't pass anything to localtime() or pass None , it converts the current time instead