Skip to content

Commit

Permalink
[idbroker] Refactor IDBroker code and improve HA
Browse files Browse the repository at this point in the history
  • Loading branch information
Harshg999 committed Mar 6, 2024
1 parent a17d499 commit 1be02d8
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 204 deletions.
5 changes: 2 additions & 3 deletions desktop/core/src/desktop/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2623,9 +2623,8 @@ def is_cm_managed():
def is_gs_enabled():
from desktop.lib.idbroker import conf as conf_idbroker # Circular dependencies desktop.conf -> idbroker.conf -> desktop.conf

return ('default' in list(GC_ACCOUNTS.keys()) and GC_ACCOUNTS['default'].JSON_CREDENTIALS.get()) or \
conf_idbroker.is_idbroker_enabled('gs') or \
is_raz_gs()
return ('default' in list(GC_ACCOUNTS.keys()) and GC_ACCOUNTS['default'].JSON_CREDENTIALS.get()) or is_raz_gs() or \
conf_idbroker.is_idbroker_enabled('gs')

def has_gs_access(user):
from desktop.auth.backend import is_admin
Expand Down
19 changes: 11 additions & 8 deletions desktop/core/src/desktop/lib/idbroker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import

from builtins import object
import logging

Expand Down Expand Up @@ -48,8 +46,11 @@ def from_core_site(cls, fs=None, user=None):


def __init__(self, user=None, address=None, dt_path=None, path=None, security=None):
self.user=user
self.address=address
self.user = user
self.address = address
if not self.address:
raise PopupException('Failed to connect to IDBroker: No active or healthy instance was found.')

self.dt_path = dt_path
self.path = path
self.security = security
Expand All @@ -60,9 +61,9 @@ def __init__(self, user=None, address=None, dt_path=None, path=None, security=No
def _knox_token_params(self):
if self.user:
if self.security['type'] == 'kerberos':
return { 'doAs': self.user }
return {'doAs': self.user}
else:
return { 'user.name': self.user }
return {'user.name': self.user}
else:
return None

Expand All @@ -73,7 +74,8 @@ def get_auth_token(self):
elif self.security['type'] == 'basic':
self._client.set_basic_auth(self.security['params']['username'], self.security['params']['password'])
try:
res = self._root.invoke("GET", self.dt_path + _KNOX_TOKEN_API, self._knox_token_params(), allow_redirects=True, log_response=False) # Can't log response because returns credentials
# Can't log response because returns credentials
res = self._root.invoke("GET", self.dt_path + _KNOX_TOKEN_API, self._knox_token_params(), allow_redirects=True, log_response=False)
return res.get('access_token')
except Exception as e:
raise PopupException('Failed to authenticate to IDBroker with error: %s' % e.message)
Expand All @@ -82,6 +84,7 @@ def get_auth_token(self):
def get_cab(self):
self._client.set_bearer_auth(self.get_auth_token())
try:
return self._root.invoke("GET", self.path + _CAB_API_CREDENTIALS_GLOBAL, allow_redirects=True, log_response=False) # Can't log response because returns credentials
# Can't log response because returns credentials
return self._root.invoke("GET", self.path + _CAB_API_CREDENTIALS_GLOBAL, allow_redirects=True, log_response=False)
except Exception as e:
raise PopupException('Failed to obtain storage credentials from IDBroker with error: %s' % e.message)
58 changes: 28 additions & 30 deletions desktop/core/src/desktop/lib/idbroker/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,54 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import

import logging
import sys
import requests

from requests_kerberos import HTTPKerberosAuth
from hadoop.core_site import get_conf

if sys.version_info[0] > 2:
from django.utils.translation import gettext_lazy as _t
else:
from django.utils.translation import ugettext_lazy as _t
from django.utils.translation import gettext_lazy as _t


LOG = logging.getLogger()


_CNF_CAB_ADDRESS = 'fs.%s.ext.cab.address' # http://host:8444/gateway
_CNF_CAB_ADDRESS_DT_PATH = 'fs.%s.ext.cab.dt.path' # dt
_CNF_CAB_ADDRESS_PATH = 'fs.%s.ext.cab.path' # aws-cab
_CNF_CAB_USERNAME = 'fs.%s.ext.cab.username' # when not using kerberos
_CNF_CAB_PASSWORD = 'fs.%s.ext.cab.password'

SUPPORTED_FS = {'s3a': 's3a', 'adl': 'azure', 'abfs': 'azure', 'azure': 'azure', 'gs': 'gs'}

def validate_fs(fs=None):
if fs in SUPPORTED_FS:
return SUPPORTED_FS[fs]
else:
LOG.warning('Selected FS %s is not supported by Hue IDBroker client' % fs)
LOG.warning('Selected filesystem %s is not supported by Hue IDBroker client.' % fs)
return None

def _handle_idbroker_ha(fs=None):
fs = validate_fs(fs)
idbrokeraddr = get_conf().get(_CNF_CAB_ADDRESS % fs) if fs else None
response = None
idbroker_addr_list = []
if fs:
id_broker_addr = get_conf().get(_CNF_CAB_ADDRESS % fs)
if id_broker_addr:
id_broker_addr_list = id_broker_addr.split(',')
for id_broker_addr in id_broker_addr_list:
try:
response = requests.get(id_broker_addr.rstrip('/') + '/dt/knoxtoken/api/v1/token', auth=HTTPKerberosAuth(), verify=False)
except Exception as e:
if 'Name or service not known' in str(e):
LOG.warn('IDBroker %s is not available for use' % id_broker_addr)
# Check response for None and if response code is successful (200) or authentication needed (401)
if (response is not None) and (response.status_code in (200, 401)):
idbrokeraddr = id_broker_addr
break
return idbrokeraddr
else:
return idbrokeraddr
else:
return idbrokeraddr
idbroker_addr = get_conf().get(_CNF_CAB_ADDRESS % fs, '')
idbroker_addr_list = idbroker_addr.split(',')

response = None
for idb in idbroker_addr_list:
try:
response = requests.get(idb.rstrip('/') + '/dt/knoxtoken/api/v1/token', auth=HTTPKerberosAuth(), verify=False)
except Exception as e:
if 'Failed to establish a new connection' in str(e):
LOG.warning('IDBroker URL %s is not available.' % idb)

# Check response for None and if response code is successful (200) or authentication needed (401)
if (response is not None) and (response.status_code in (200, 401)):
return idb


def get_cab_address(fs=None):
fs = validate_fs(fs)
return _handle_idbroker_ha(fs)

def get_cab_dt_path(fs=None):
Expand All @@ -89,7 +82,12 @@ def get_cab_password(fs=None):
def is_idbroker_enabled(fs=None):
from desktop.conf import RAZ # Must be imported dynamically in order to have proper value

return get_cab_address(fs) is not None and not RAZ.IS_ENABLED.get() # Skipping IDBroker for FS when RAZ is present
fs = validate_fs(fs)
idbroker_addr_from_coresite = get_conf().get(_CNF_CAB_ADDRESS % fs)

# When RAZ is configured, skip checking for IDBroker configs from core-site.
# RAZ gets precedence over IDBroker when both are configured in Hue.
return (not RAZ.IS_ENABLED.get() and bool(idbroker_addr_from_coresite))

def config_validator():
res = []
Expand Down
70 changes: 36 additions & 34 deletions desktop/core/src/desktop/lib/idbroker/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,65 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import

import logging
import unittest
import sys

from nose.tools import assert_equal, assert_true
from nose.tools import assert_equal
from unittest.mock import patch

from desktop.lib.idbroker.client import IDBroker

if sys.version_info[0] > 2:
from unittest.mock import patch
else:
from mock import patch

LOG = logging.getLogger()


class TestIDBroker(unittest.TestCase):
def test_username_authentication(self):
with patch('desktop.lib.idbroker.conf.get_conf') as conf:
with patch('desktop.lib.idbroker.client.resource.Resource.invoke') as invoke:
with patch('desktop.lib.idbroker.client.http_client.HttpClient.set_basic_auth') as set_basic_auth:
conf.return_value = {
'fs.s3a.ext.cab.address': 'address',
'fs.s3a.ext.cab.dt.path': 'dt_path',
'fs.s3a.ext.cab.path': 'path',
'fs.s3a.ext.cab.username': 'username',
'fs.s3a.ext.cab.password': 'password'
}
invoke.return_value = {
'Credentials': 'Credentials'
}
client = IDBroker.from_core_site('s3a', 'test')

cab = client.get_cab()
assert_equal(invoke.call_count, 2) # get_cab calls twice
assert_equal(cab.get('Credentials'), 'Credentials')
assert_equal(set_basic_auth.call_count, 1)

def test_kerberos_authentication(self):
with patch('desktop.lib.idbroker.conf.get_conf') as conf:
with patch('desktop.lib.idbroker.client.is_kerberos_enabled') as is_kerberos_enabled:
with patch('desktop.lib.idbroker.client.resource.Resource.invoke') as invoke:
with patch('desktop.lib.idbroker.client.http_client.HttpClient.set_kerberos_auth') as set_kerberos_auth:
is_kerberos_enabled.return_value = True
with patch('desktop.lib.idbroker.conf.get_cab_address') as get_cab_address:
conf.return_value = {
'fs.s3a.ext.cab.address': 'address',
'fs.s3a.ext.cab.dt.path': 'dt_path',
'fs.s3a.ext.cab.path': 'path',
'hadoop.security.authentication': 'kerberos',
'fs.s3a.ext.cab.username': 'username',
'fs.s3a.ext.cab.password': 'password'
}
invoke.return_value = {
'Credentials': 'Credentials'
}
client = IDBroker.from_core_site('s3a', 'test')
get_cab_address.return_value = 'address'

client = IDBroker.from_core_site('s3a', 'test')
cab = client.get_cab()

assert_equal(invoke.call_count, 2) # get_cab calls twice
assert_equal(cab.get('Credentials'), 'Credentials')
assert_equal(set_kerberos_auth.call_count, 1)
assert_equal(set_basic_auth.call_count, 1)


def test_kerberos_authentication(self):
with patch('desktop.lib.idbroker.conf.get_conf') as conf:
with patch('desktop.lib.idbroker.client.is_kerberos_enabled') as is_kerberos_enabled:
with patch('desktop.lib.idbroker.client.resource.Resource.invoke') as invoke:
with patch('desktop.lib.idbroker.client.http_client.HttpClient.set_kerberos_auth') as set_kerberos_auth:
with patch('desktop.lib.idbroker.conf.get_cab_address') as get_cab_address:
is_kerberos_enabled.return_value = True
conf.return_value = {
'fs.s3a.ext.cab.address': 'address',
'fs.s3a.ext.cab.dt.path': 'dt_path',
'fs.s3a.ext.cab.path': 'path',
'hadoop.security.authentication': 'kerberos',
}
invoke.return_value = {
'Credentials': 'Credentials'
}
get_cab_address.return_value = 'address'

client = IDBroker.from_core_site('s3a', 'test')
cab = client.get_cab()

assert_equal(invoke.call_count, 2) # get_cab calls twice
assert_equal(cab.get('Credentials'), 'Credentials')
assert_equal(set_kerberos_auth.call_count, 1)
4 changes: 2 additions & 2 deletions desktop/libs/aws/src/aws/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ def get_default_get_environment_credentials():
def is_enabled():
return ('default' in list(AWS_ACCOUNTS.keys()) and AWS_ACCOUNTS['default'].get_raw() and AWS_ACCOUNTS['default'].ACCESS_KEY_ID.get()) or \
has_iam_metadata() or \
conf_idbroker.is_idbroker_enabled('s3a') or \
is_raz_s3()
is_raz_s3() or \
conf_idbroker.is_idbroker_enabled('s3a')


def is_ec2_instance():
Expand Down
Loading

0 comments on commit 1be02d8

Please sign in to comment.