Skip to content

Commit

Permalink
Add support for Conductor Proxy to work with JEG outside of Conductor…
Browse files Browse the repository at this point in the history
… Cluster (#891)

* Add support for Conductor Proxy to work with JEG outside of Conductor Cluster

Today the Conductor Proxy and all of JEG for Conductor must run through a Conductor EGO Service and be setup in a certain way. This new support allows for JEG to be run anywhere but offload workload to Conductor using JWT Tokens for authentication.

* Add async calls for functions doing REST API calls
  • Loading branch information
kjdoyle authored Sep 30, 2020
1 parent b883258 commit f429b1a
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 14 deletions.
210 changes: 197 additions & 13 deletions enterprise_gateway/services/processproxies/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

from .processproxy import RemoteProcessProxy

from notebook.utils import url_unescape

from random import randint

pjoin = os.path.join
local_ip = localinterfaces.public_ips()[0]
poll_interval = float(os.getenv('EG_POLL_INTERVAL', '0.5'))
Expand All @@ -32,34 +36,46 @@ def __init__(self, kernel_manager, proxy_config):
self.driver_id = None
self.env = None
self.rest_credential = None
self.jwt_token = None
self.conductor_endpoint = proxy_config.get('conductor_endpoint',
kernel_manager.conductor_endpoint)
self.ascd_endpoint = self.conductor_endpoint

async def launch_process(self, kernel_cmd, **kwargs):
"""Launches the specified process within a Conductor cluster environment."""
await super(ConductorClusterProcessProxy, self).launch_process(kernel_cmd, **kwargs)
# Get cred from process env

self.env = kwargs.get('env')
self.kernel_headers = kwargs.get('kernel_headers')

# Get Conductor cred from process env
env_dict = dict(os.environ.copy())
if env_dict and 'EGO_SERVICE_CREDENTIAL' in env_dict:
self.rest_credential = env_dict['EGO_SERVICE_CREDENTIAL']
elif self.kernel_headers and 'Jwt-Auth-User-Payload' in self.kernel_headers:
kwargs.get('env')['KERNEL_NOTEBOOK_COOKIE_JAR'] = 'kernelcookie' + str(randint(0, 1000))
jsonKH = json.loads(self.kernel_headers['Jwt-Auth-User-Payload'])
self.jwt_token = jsonKH['accessToken']
await asyncio.get_event_loop().run_in_executor(None, self._performConductorJWTLogonAndRetrieval,
self.jwt_token, kwargs.get('env'))
else:
error_message = "ConductorClusterProcessProxy failed to obtain the Conductor credential."
self.log_and_raise(http_status_code=500, reason=error_message)

# dynamically update Spark submit parameters
self._update_launch_info(kernel_cmd, **kwargs)
await asyncio.get_event_loop().run_in_executor(None, self._update_launch_info, kernel_cmd, kwargs.get('env'))
# Enable stderr PIPE for the run command
kwargs.update({'stderr': subprocess.PIPE})
self.local_proc = self.launch_kernel(kernel_cmd, **kwargs)
self.pid = self.local_proc.pid
self.ip = local_ip
self.env = kwargs.get('env')

self.log.debug("Conductor cluster kernel launched using Conductor endpoint: {}, pid: {}, Kernel ID: {}, "
"cmd: '{}'".format(self.conductor_endpoint, self.local_proc.pid, self.kernel_id, kernel_cmd))
await self.confirm_remote_startup()
return self

def _update_launch_info(self, kernel_cmd, **kwargs):
def _update_launch_info(self, kernel_cmd, env_dict):
""" Dynamically assemble the spark-submit configuration passed from NB2KG."""
if any(arg.endswith('.sh') for arg in kernel_cmd):
self.log.debug("kernel_cmd contains execution script")
Expand All @@ -68,7 +84,6 @@ def _update_launch_info(self, kernel_cmd, **kwargs):
cmd = pjoin(kernel_dir, 'bin/run.sh')
kernel_cmd.insert(0, cmd)

env_dict = kwargs.get('env')
# add SPARK_HOME, PYSPARK_PYTHON, update SPARK_OPT to contain SPARK_MASTER and EGO_SERVICE_CREDENTIAL
env_dict['SPARK_HOME'] = env_dict['KERNEL_SPARK_HOME']
env_dict['PYSPARK_PYTHON'] = env_dict['KERNEL_PYSPARK_PYTHON']
Expand All @@ -77,19 +92,86 @@ def _update_launch_info(self, kernel_cmd, **kwargs):
if 'KERNEL_SPARK_OPTS' in env_dict:
user_defined_spark_opts = env_dict['KERNEL_SPARK_OPTS']

# Get updated one_notebook_master_rest_url for KERNEL_NOTEBOOK_MASTER_REST and SPARK_OPTS.
if self.jwt_token is None:
self._update_notebook_master_rest_url(env_dict)

if "--master" not in env_dict['SPARK_OPTS']:
env_dict['SPARK_OPTS'] = '--master {master} --conf spark.ego.credential={rest_cred} ' \
'--conf spark.pyspark.python={pyspark_python} {spark_opts} ' \
'{user_defined_spark_opts}'.\
format(master=env_dict['KERNEL_NOTEBOOK_MASTER_REST'], rest_cred=self.rest_credential,
format(master=env_dict['KERNEL_NOTEBOOK_MASTER_REST'], rest_cred="'" + self.rest_credential + "'",
pyspark_python=env_dict['PYSPARK_PYTHON'], spark_opts=env_dict['SPARK_OPTS'],
user_defined_spark_opts=user_defined_spark_opts)

def _update_notebook_master_rest_url(self, env_dict):
"""Updates the notebook master rest url to update KERNEL_NOTEBOOK_MASTER_REST,
conductor_endpoint, and SPARK_OPTS.
"""

self.log.debug("Updating notebook master rest urls.")
response = None
# Assemble REST call
header = 'Accept: application/json'
authorization = 'Authorization: %s' % self.rest_credential
if 'KERNEL_NOTEBOOK_DATA_DIR' not in env_dict or 'KERNEL_NOTEBOOK_COOKIE_JAR' not in env_dict \
or 'KERNEL_CURL_SECURITY_OPT' not in env_dict:
self.log.warning("Could not find KERNEL environment variables. Not updating notebook master rest url.")
return
if 'CONDUCTOR_REST_URL' not in env_dict or 'KERNEL_SIG_ID' not in env_dict \
or 'KERNEL_NOTEBOOK_MASTER_REST' not in env_dict:
self.log.warning("Could not find CONDUCTOR_REST_URL or KERNEL_SIG_ID or KERNEL_NOTEBOOK_MASTER_REST. "
"Not updating notebook master rest url.")
return

cookie_jar = pjoin(env_dict['KERNEL_NOTEBOOK_DATA_DIR'], env_dict['KERNEL_NOTEBOOK_COOKIE_JAR'])
sslconf = env_dict['KERNEL_CURL_SECURITY_OPT'].split()
ascd_rest_url = env_dict['CONDUCTOR_REST_URL']
ig_id = env_dict['KERNEL_SIG_ID']
url = '%sconductor/v1/instances?id=%s&fields=outputs' % (ascd_rest_url, ig_id)
cmd = ['curl', '-v', '-b', cookie_jar, '-X', 'GET', '-H', header, '-H', authorization, url]
cmd[2:2] = sslconf
# Perform REST call
try:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
output, stderr = process.communicate()
response = json.loads(output) if output else None
if response is None or len(response) < 1 or not response[0] or not response[0]['outputs']:
response = None
except Exception as e:
self.log.warning("Getting instance group with cmd '{}' failed with exception: '{}'. Continuing...".
format(cmd, e))
return

outputs = response[0]['outputs']

if 'one_notebook_master_rest_url' not in outputs or not outputs['one_notebook_master_rest_url'] \
or 'value' not in outputs['one_notebook_master_rest_url'] \
or not outputs['one_notebook_master_rest_url']['value']:
self.log.warning("Could not get one_notebook_master_rest_url from instance group. "
"Not updating notebook master rest url.")
return
if 'one_notebook_master_web_submission_url' not in outputs \
or not outputs['one_notebook_master_web_submission_url'] \
or 'value' not in outputs['one_notebook_master_web_submission_url'] \
or not outputs['one_notebook_master_web_submission_url']['value']:
self.log.warning("Could not get one_notebook_master_web_submission_url from instance group. "
"Not updating notebook master rest url.")
return

updated_one_notebook_master_rest_url = outputs['one_notebook_master_rest_url']['value']
updated_one_notebook_master_web_submission_url = outputs['one_notebook_master_web_submission_url']['value']

if updated_one_notebook_master_rest_url and updated_one_notebook_master_web_submission_url:
self.log.debug("Updating KERNEL_NOTEBOOK_MASTER_REST to '{}'.".format(updated_one_notebook_master_rest_url))
os.environ['KERNEL_NOTEBOOK_MASTER_REST'] = updated_one_notebook_master_rest_url
env_dict['KERNEL_NOTEBOOK_MASTER_REST'] = updated_one_notebook_master_rest_url
self.conductor_endpoint = updated_one_notebook_master_web_submission_url

def poll(self):
"""Submitting a new kernel/app will take a while to be SUBMITTED.
Thus application ID will probably not be available immediately for poll.
So will regard the application as RUNNING when application ID still in SUBMITTED/WAITING/RUNNING state.
:return: None if the application's ID is available and state is SUBMITTED/WAITING/RUNNING. Otherwise False.
"""
result = False
Expand All @@ -102,7 +184,6 @@ def poll(self):

def send_signal(self, signum):
"""Currently only support 0 as poll and other as kill.
:param signum
:return:
"""
Expand Down Expand Up @@ -172,6 +253,11 @@ def _parse_driver_submission_id(self, submission_response):
if driver_id and len(driver_id) > 0:
self.driver_id = driver_id[0]
self.log.debug("Driver ID: {}".format(driver_id[0]))
# Handle Checking for submission error to report
err_lines = [line for line in submission_response.split('\n') if "Application submission failed" in line]
if err_lines and len(err_lines) > 0:
self.log_and_raise(http_status_code=500,
reason=err_lines[0][err_lines[0].find("Application submission failed"):])

async def confirm_remote_startup(self):
""" Confirms the application is in a started state before returning. Should post-RUNNING states be
Expand Down Expand Up @@ -284,7 +370,6 @@ def load_process_info(self, process_info):

def _query_app_by_driver_id(self, driver_id):
"""Retrieve application by using driver ID.
:param driver_id: as the unique driver id for query
:return: The JSON object of an application. None if driver_id is not found.
"""
Expand Down Expand Up @@ -317,7 +402,6 @@ def _query_app_by_driver_id(self, driver_id):

def _query_app_by_id(self, app_id):
"""Retrieve an application by application ID.
:param app_id
:return: The JSON object of an application. None if app_id is not found.
"""
Expand Down Expand Up @@ -347,7 +431,6 @@ def _query_app_by_id(self, app_id):

def _query_app_state_by_driver_id(self, driver_id):
"""Return the state of an application.
:param driver_id:
:return:
"""
Expand All @@ -361,7 +444,6 @@ def _query_app_state_by_driver_id(self, driver_id):

def _get_driver_by_app_id(self, app_id):
"""Get driver info from application ID.
:param app_id
:return: The JSON response driver information of the corresponding application. None if app_id is not found.
"""
Expand All @@ -378,7 +460,6 @@ def _get_driver_by_app_id(self, app_id):

def _kill_app_by_driver_id(self, driver_id):
"""Kill an application. If the app's state is FINISHED or FAILED, it won't be changed to KILLED.
:param driver_id
:return: The JSON response of killing the application. None if driver is not found.
"""
Expand Down Expand Up @@ -415,3 +496,106 @@ def _kill_app_by_driver_id(self, driver_id):
format(cmd, e))
self.log.debug("Kill response: {}".format(response))
return response

def _performRestCall(self, cmd, url, HA_LIST):
for HA in HA_LIST:
portcolon = url.rfind(':')
slash = url.find('://')
url = url[0:slash + 3] + HA + url[portcolon:]
cmd[-1] = url
self.log.debug(cmd)
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True,
universal_newlines=True)
output, stderr = process.communicate()
if 'Could not resolve host' not in stderr and 'Failed connect to' not in stderr \
and 'Connection refused' not in stderr:
return output, stderr
self.log_and_raise(http_status_code=500, reason='Could not connect to ascd. Verify ascd is running.')
return 'Error', 'Error'

def _performConductorJWTLogonAndRetrieval(self, jwt_token, env_dict):
"""Authenticate to Conductor with a JWT Token and setup the kernel environment variables.
:param jwt_token: JWT Token to authenticate with to Conductor
:param env_dict: Environment Dictionary of this Kernel launch
:return: None
"""
response = None
if not jwt_token:
return response
# Assemble JWT Auth logon REST call
env = self.env

if env['KERNEL_IG_UUID'] is None:
reasonErr = 'Instance group specified is None. Check environment ' \
'specified instance group is available.'
self.log_and_raise(http_status_code=500, reason=reasonErr)

# Determine hostname of ascd_endpoint and setup the HA List
portcolon = self.ascd_endpoint.rfind(':')
slash = self.ascd_endpoint.find('://')
host = self.ascd_endpoint[slash + 3:portcolon]
HA_LIST = env['KERNEL_CONDUCTOR_HA_ENDPOINTS'].split(',')
HA_LIST.insert(0, host)

header = 'Accept: application/json'
authorization = 'Authorization: Bearer %s' % jwt_token
cookie_jar = pjoin(env['KERNEL_NOTEBOOK_DATA_DIR'], env['KERNEL_NOTEBOOK_COOKIE_JAR'])
sslconf = env['KERNEL_CURL_SECURITY_OPT'].split()
url = '%s/auth/logon/jwt?topology=%s' % (self.ascd_endpoint, env['KERNEL_TOPOLOGY'])
cmd = ['curl', '-v', '-b', cookie_jar, '-X', 'GET', '-H', header, '-H', authorization, url]
cmd[2:2] = sslconf
output, stderr = self._performRestCall(cmd, url, HA_LIST)
if 'Error' in output:
reasonErr = 'Failed to perform JWT Auth Logon. ' + output.splitlines()[0]
self.log.warning(cmd)
self.log_and_raise(http_status_code=500, reason=reasonErr)
self.rest_credential = url_unescape(output)[1:-1]

# Assemble EGO Token Logon REST call
authorization = 'Authorization: PlatformToken token=' + output.strip('"')
url = '%s/auth/logon' % self.ascd_endpoint
cmd = ['curl', '-v', '-c', cookie_jar, '-X', 'GET', '-H', header, '-H', authorization, url]
cmd[2:2] = sslconf
output, stderr = self._performRestCall(cmd, url, HA_LIST)
if 'Error' in output:
reasonErr = 'Failed to perform EGO Auth Logon. ' + output.splitlines()[0]
self.log.warning(cmd)
self.log_and_raise(http_status_code=500, reason=reasonErr)

# Get the Python path to use to make sure the right conda environment is used
url = '%s/anaconda/instances/%s' % (self.ascd_endpoint, env['KERNEL_ANACONDA_INST_UUID'])
cmd = ['curl', '-v', '-b', cookie_jar, '-X', 'GET', '-H', header, '-H', authorization, url]
cmd[2:2] = sslconf
output, stderr = self._performRestCall(cmd, url, HA_LIST)
response = json.loads(output) if output else None
if response is None or not response['parameters']['deploy_home']['value']:
reasonErr = 'Could not retrieve anaconda instance. Verify anaconda instance with id '
reasonErr = reasonErr + env['KERNEL_ANACONDA_INST_UUID'] + ' exists'
self.log.warning(cmd)
self.log_and_raise(http_status_code=500, reason=reasonErr)
else:
env_dict['KERNEL_PYSPARK_PYTHON'] = response['parameters']['deploy_home']['value'] \
+ '/anaconda/envs/' + env['KERNEL_ANACONDA_ENV'] + '/bin/python'

# Get instance group information we need
url = '%s/instances?id=%s&fields=sparkinstancegroup,outputs' % (self.ascd_endpoint, env['KERNEL_IG_UUID'])
cmd = ['curl', '-v', '-b', cookie_jar, '-X', 'GET', '-H', header, '-H', authorization, url]
cmd[2:2] = sslconf
output, stderr = self._performRestCall(cmd, url, HA_LIST)
response = json.loads(output) if output else None

if response is None or len(response) == 0 or response[0] is None:
reasonErr = 'Could not retrieve instance group. Verify instance group with id ' \
+ env['KERNEL_IG_UUID'] + ' exists.'
self.log.warning(cmd)
self.log_and_raise(http_status_code=500, reason=reasonErr)
elif response is None or response[0] is None or 'value' not in response[0]['outputs']['batch_master_rest_urls']:
reasonErr = 'Could not retrieve outputs for instance group. Verify instance group with id ' \
+ env['KERNEL_IG_UUID'] + ' is started'
self.log.warning(cmd)
self.log_and_raise(http_status_code=500, reason=reasonErr)
else:
env_dict['KERNEL_SPARK_HOME'] = response[0]['sparkinstancegroup']['sparkhomedir']
env_dict['KERNEL_NOTEBOOK_MASTER_REST'] = response[0]['outputs']['batch_master_rest_urls']['value']
self.conductor_endpoint = response[0]['outputs']['one_batch_master_web_submission_url']['value']
return response
6 changes: 5 additions & 1 deletion etc/kernelspecs/spark_python_conductor_cluster/bin/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ if [ -z "${SPARK_HOME}" ]; then
exit 1
fi

PROG_HOME="$(cd "`dirname "$0"`"/..; pwd)"
if [ -z "${KERNEL_IG_UUID}" ]; then
PROG_HOME="$(cd "`dirname "$0"`"/..; pwd)"
else
PROG_HOME="${SPARK_HOME}"
fi

eval exec "${IMPERSONATION_OPTS}" \
"${SPARK_HOME}/bin/spark-submit" \
Expand Down

0 comments on commit f429b1a

Please sign in to comment.