Skip to content

Commit

Permalink
Add timeout to all outgoing http requests (#338)
Browse files Browse the repository at this point in the history
Currently if a slave becomes unresponsive the master service will
freeze up for a long time (or forever). The slave allocation loop
will hang trying to contact a slave and no builds will be able to
start.

This adds a timeout to all requests between the master and slaves
and corresponding error handling. I made the timeout on
slave->master calls longer because we don't currently have great
metrics and have had issues with request load on the master in the
past (which we should improve separately).
  • Loading branch information
josephharrington authored May 10, 2017
1 parent ea71b49 commit fe1ce19
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 45 deletions.
40 changes: 40 additions & 0 deletions app/client/cluster_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,46 @@ def get_slaves(self):
response = self._network.get(slave_url)
return response.json()

def connect_slave(self, slave_url: str, num_executors: int=10) -> int:
"""
Connect a slave to the master. This is mostly useful for testing since real slave services
make this call to the master on startup.
:param slave_url: The hostname and port of the slave, e.g., 'localhost:43001'
:param num_executors: The number of executors for the slave
:return: The new slave id
"""
data = {
'slave': slave_url,
'num_executors': num_executors,
}
create_slave_url = self._api.url('slave')
response_data = self._network.post(create_slave_url, data=data).json()
slave_id = int(response_data['slave_id'])
return slave_id

def get_slave_status(self, slave_id: int) -> dict:
"""
Send a get request to the master to get the status of the specified slave.
:param slave_id: The id of the slave
:return: The API response data
"""
slave_status_url = self._api.url('slave', slave_id)
response_data = self._network.get(slave_status_url).json()
return response_data['slave']

def block_until_slave_offline(self, slave_id: int, timeout: int=None) -> bool:
"""
Poll the build status endpoint until the build is no longer queued.
:param slave_id: The id of the slave to wait for
:param timeout: The maximum number of seconds to wait until giving up, or None for no timeout
:return: Whether the slave went offline during the timeout
"""
def is_slave_offline():
slave_data = self.get_slave_status(slave_id)
return not slave_data['is_alive']

return poll.wait_for(is_slave_offline, timeout_seconds=timeout)

def graceful_shutdown_slaves_by_id(self, slave_ids):
"""
:type slave_ids: list[int]
Expand Down
1 change: 1 addition & 0 deletions app/master/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def _mark_subjob_complete(self, subjob_id):
# We use a local variable here which was set inside the _build_completion_lock to prevent a race condition
if should_trigger_postbuild_tasks:
self._logger.info("All results received for build {}!", self._build_id)
# todo: This should not be a SafeThread. https://github.com/box/ClusterRunner/issues/323
SafeThread(target=self._perform_async_postbuild_tasks, name='PostBuild{}'.format(self._build_id)).start()

def mark_started(self):
Expand Down
17 changes: 11 additions & 6 deletions app/master/build_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from queue import Empty
from threading import Lock

from app.master.slave import SlaveMarkedForShutdownError
from app.master.slave import Slave, SlaveMarkedForShutdownError
from app.util import analytics
from app.util.log import get_logger

Expand Down Expand Up @@ -60,19 +60,24 @@ def needs_more_slaves(self):
return False
return True

def allocate_slave(self, slave):
def allocate_slave(self, slave: Slave) -> bool:
"""
Allocate a slave to this build. This tells the slave to execute setup commands for this build.
:type slave: Slave
:param slave: The slave to allocate
:return: Whether slave allocation was successful; this can fail if the slave is unresponsive
"""
if not self._build_started:
self._build_started = True
self._build.mark_started()
self._slaves_allocated.append(slave)
slave.setup(self._build, executor_start_index=self._num_executors_allocated)

# Increment executors before triggering setup. This helps make sure the build won't take down
# every slave in the cluster if setup calls fail because of a problem with the build.
next_executor_index = self._num_executors_allocated
self._num_executors_allocated += min(slave.num_executors, self._max_executors_per_slave)
analytics.record_event(analytics.BUILD_SETUP_START, build_id=self._build.build_id(), slave_id=slave.id)
self._slaves_allocated.append(slave)

return slave.setup(self._build, executor_start_index=next_executor_index)

def begin_subjob_executions_on_slave(self, slave):
"""
Expand Down
59 changes: 37 additions & 22 deletions app/master/slave.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import requests.exceptions
import requests

from app.master.build import Build
from app.util import analytics, log
from app.util.counter import Counter
from app.util.network import Network
Expand Down Expand Up @@ -62,15 +63,14 @@ def mark_as_idle(self):
self.kill()
raise SlaveMarkedForShutdownError

def setup(self, build, executor_start_index):
def setup(self, build: Build, executor_start_index: int) -> bool:
"""
Execute a setup command on the slave for the specified build. The setup process executes asynchronously on the
slave and the slave will alert the master when setup is complete and it is ready to start working on subjobs.
:param build: The build to set up this slave to work on
:type build: Build
:param executor_start_index: The index the slave should number its executors from for this build
:type executor_start_index: int
:return: Whether or not the call to start setup on the slave was successful
"""
slave_project_type_params = build.build_request.build_parameters().copy()
slave_project_type_params.update(build.project_type.slave_param_overrides())
Expand All @@ -82,17 +82,28 @@ def setup(self, build, executor_start_index):
}

self.current_build_id = build.build_id()
self._network.post_with_digest(setup_url, post_data, Secret.get())
try:
self._network.post_with_digest(setup_url, post_data, Secret.get())
except (requests.ConnectionError, requests.Timeout) as ex:
self._logger.warning('Setup call to {} failed with {}: {}.', self, ex.__class__.__name__, str(ex))
self.mark_dead()
return False
return True

def teardown(self):
"""
Tell the slave to run the build teardown
"""
if self.is_alive():
teardown_url = self._slave_api.url('build', self.current_build_id, 'teardown')
self._network.post(teardown_url)
else:
if not self.is_alive():
self._logger.notice('Teardown request to slave {} was not sent since slave is disconnected.', self.url)
return

teardown_url = self._slave_api.url('build', self.current_build_id, 'teardown')
try:
self._network.post(teardown_url)
except (requests.ConnectionError, requests.Timeout):
self._logger.warning('Teardown request to slave failed because slave is unresponsive.')
self.mark_dead()

def start_subjob(self, subjob):
"""
Expand All @@ -105,6 +116,7 @@ def start_subjob(self, subjob):
raise SlaveMarkedForShutdownError('Tried to start a subjob on a slave in shutdown mode. ({}, id: {})'
.format(self.url, self.id))

# todo: This should not be a SafeThread. https://github.com/box/ClusterRunner/issues/337
SafeThread(target=self._async_start_subjob, args=(subjob,)).start()

def _async_start_subjob(self, subjob):
Expand Down Expand Up @@ -134,7 +146,7 @@ def free_executor(self):
raise Exception('Cannot free executor on slave {}. All are free.'.format(self.url))
return new_count

def is_alive(self, use_cached=True):
def is_alive(self, use_cached: bool=True) -> bool:
"""
Is the slave API responsive?
Expand All @@ -143,8 +155,7 @@ def is_alive(self, use_cached=True):
:param use_cached: Should we use the last returned value of the network check to the slave? If True,
will return cached value. If False, this method will perform an actual network call to the slave.
:type use_cached: bool
:rtype: bool
:return: Whether or not the slave is alive
"""
if use_cached:
return self._is_alive
Expand All @@ -153,21 +164,20 @@ def is_alive(self, use_cached=True):
response = self._network.get(self._slave_api.url(), headers=self._expected_session_header())

if not response.ok:
self._is_alive = False
self.mark_dead()
else:
response_data = response.json()

if 'slave' not in response_data or 'is_alive' not in response_data['slave']:
self._logger.warning('{}\'s API is missing key slave[\'is_alive\'].', self.url)
self._is_alive = False
self.mark_dead()
elif not isinstance(response_data['slave']['is_alive'], bool):
self._logger.warning('{}\'s API key \'is_alive\' is not a boolean.', self.url)
self._is_alive = False
self.mark_dead()
else:
self._is_alive = response_data['slave']['is_alive']
except requests.exceptions.ConnectionError:
self._logger.warning('Slave with url {} is offline.', self.url)
self._is_alive = False
except (requests.ConnectionError, requests.Timeout):
self.mark_dead()

return self._is_alive

Expand Down Expand Up @@ -196,17 +206,22 @@ def is_shutdown(self):

def kill(self):
"""
Instructs the slave process to kill itself.
Instruct the slave process to kill itself.
"""
self._logger.notice('Killing {}', self)
kill_url = self._slave_api.url('kill')
self._network.post_with_digest(kill_url, {}, Secret.get())
try:
self._network.post_with_digest(kill_url, {}, Secret.get())
except (requests.ConnectionError, requests.Timeout):
pass
self.mark_dead()

def mark_dead(self):
"""
Marks the slave dead.
Mark the slave dead.
"""
self.set_is_alive(False)
self._logger.warning('{} has gone offline. Last build: {}', self, self.current_build_id)
self._is_alive = False
self.current_build_id = None
self._network.reset_session() # Close any pooled connections for this slave.

Expand Down
2 changes: 1 addition & 1 deletion app/slave/cluster_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def _is_master_responsive(self):
is_responsive = True
try:
self._network.get(self._master_api.url())
except requests.ConnectionError:
except (requests.ConnectionError, requests.Timeout):
is_responsive = False

return is_responsive
Expand Down
3 changes: 3 additions & 0 deletions app/util/conf/base_config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def configure_defaults(self, conf):
conf.set('master_port', '43000')
conf.set('slaves', ['localhost'])

conf.set('default_http_timeout', 30)

# Strict host key checking on git remote operations, disabled by default
conf.set('git_strict_host_key_checking', False)

Expand Down Expand Up @@ -141,6 +143,7 @@ def _get_config_file_whitelisted_keys(self):
'git_strict_host_key_checking',
'cors_allowed_origins_regex',
'get_project_from_master',
'default_http_timeout',
]

def _load_section_from_config_file(self, config, config_filename, section):
Expand Down
4 changes: 4 additions & 0 deletions app/util/conf/slave_config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def configure_defaults(self, conf):
conf.set('master_hostname', 'localhost')
conf.set('master_port', 43000)
conf.set('shallow_clones', True)
# Use a longer timeout for slaves since we don't yet have request metrics on the slave side and since
# slaves are more likely to encounter long response times on the master due to the master being a
# centralized hub with a single-threaded server.
conf.set('default_http_timeout', 120)

def configure_postload(self, conf):
"""
Expand Down
18 changes: 16 additions & 2 deletions app/util/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import requests
from requests.adapters import HTTPAdapter, DEFAULT_POOLSIZE

from app.util.conf.configuration import Configuration
from app.util.decorators import retry_on_exception_exponential_backoff
from app.util.log import get_logger
from app.util.secret import Secret
Expand Down Expand Up @@ -100,7 +101,17 @@ def encode_body(self, body_decoded):
"""
return json.dumps(body_decoded)

def _request(self, method, url, data=None, should_encode_body=True, error_on_failure=False, *args, **kwargs):
def _request(
self,
method,
url,
data=None,
should_encode_body=True,
error_on_failure=False,
timeout=None,
*args,
**kwargs
):
"""
A wrapper around requests library network request methods (e.g., GET, POST). We can add functionality for
unimplemented request methods as needed. We also do some mutation on request bodies to make receiving data (in
Expand All @@ -118,8 +129,11 @@ def _request(self, method, url, data=None, should_encode_body=True, error_on_fai
:type should_encode_body: bool
:param error_on_failure: If true, raise an error when the response is not in the 200s
:type error_on_failure: bool
:param timeout: The timeout in seconds for this request; raises requests.Timeout upon timeout.
:type timeout: int|None
:rtype: requests.Response
"""
timeout = timeout or Configuration['default_http_timeout']

# If data type is dict, we json-encode it and nest the encoded string inside a new dict. This prevents the
# requests library from trying to directly urlencode the key value pairs of the original data, which will
Expand All @@ -130,7 +144,7 @@ def _request(self, method, url, data=None, should_encode_body=True, error_on_fai
if should_encode_body and isinstance(data_to_send, dict):
data_to_send = {ENCODED_BODY: self.encode_body(data_to_send)}

resp = self._session.request(method, url, data=data_to_send, *args, **kwargs)
resp = self._session.request(method, url, data=data_to_send, timeout=timeout, *args, **kwargs)
if not resp.ok and error_on_failure:
raise _RequestFailedError('Request to {} failed with status_code {} and response "{}"'.
format(url, str(resp.status_code), resp.text))
Expand Down
12 changes: 10 additions & 2 deletions test/framework/functional/base_functional_test_case.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from contextlib import suppress
import http.client
import os
from os import path
import shutil
import tempfile
from unittest import TestCase

from app.util import fs, log
from app.util.conf.base_config_loader import BaseConfigLoader
from app.util.conf.configuration import Configuration
from app.util.process_utils import is_windows
from app.util.network import Network
from app.util.secret import Secret
Expand All @@ -26,11 +26,19 @@ def setUp(self):
# Configure logging to go to stdout. This makes debugging easier by allowing us to see logs for failed tests.
log.configure_logging('DEBUG')

self._reset_config()
Secret.set('testsecret')

self.cluster = FunctionalTestCluster(verbose=self._get_test_verbosity())
self._network = Network()

def _reset_config(self):
Configuration.reset_singleton()
config = Configuration.singleton()
conf_loader = BaseConfigLoader()
conf_loader.configure_defaults(config)
conf_loader.configure_postload(config)

def tearDown(self):
# Give the cluster a bit of extra time to finish working (before forcefully killing it and failing the test)
with suppress(TestClusterTimeoutError):
Expand Down
Loading

0 comments on commit fe1ce19

Please sign in to comment.