Skip to content

Commit

Permalink
[core] Moved namespaces and computes to database
Browse files Browse the repository at this point in the history
The existing implementation was connecting to external Altus and DWX
api's to fetch the available computes. This commit changes that by
moving the cluster config to database. Two new tables have been added
(a) `beeswax_namespace` to hold config for a namespace/dialect
(b) `beeswax_compute` to hold configs for individual compute clusters
linked to the namespaces.
This change currently support hive and impala clusters.

There is also a service-discovery component that keeps the list of
namespaces and computes updated in the corresponding tables.
`sync_warehouses.py` performs the service discovery in the CDW environ
by talking to kubernetes api's. It creates one Hive and one Impala
namespace and one compute for each virtual warehouse. The command is
supposed to run every minute and keep the list of warehouses updated.

There are related changes and fixes to the rest of the code to support
the query execution on different computes.

Change-Id: Ifd8dbc8d716dfe2000fbfa8121e39f2610051fa1
  • Loading branch information
amitsrivastava committed Sep 19, 2023
1 parent a51f01f commit 3594a73
Show file tree
Hide file tree
Showing 18 changed files with 508 additions and 139 deletions.
52 changes: 52 additions & 0 deletions apps/beeswax/src/beeswax/migrations/0003_compute_namespace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Generated by Django 3.2.16 on 2023-09-07 14:30

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('beeswax', '0002_auto_20200320_0746'),
]

operations = [
migrations.CreateModel(
name='Namespace',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(default='', max_length=255)),
('description', models.TextField(default='')),
('dialect', models.CharField(db_index=True, help_text='Type of namespace, e.g. hive, mysql... ', max_length=32)),
('interface', models.CharField(db_index=True, default='sqlalchemy', help_text='Type of interface, e.g. sqlalchemy, hiveserver2... ', max_length=32)),
('external_id', models.CharField(db_index=True, max_length=255, null=True)),
('last_modified', models.DateTimeField(auto_now=True, db_index=True, verbose_name='Time last modified')),
],
options={
'verbose_name': 'namespace',
'verbose_name_plural': 'namespaces',
'unique_together': {('name',)},
},
),
migrations.CreateModel(
name='Compute',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(default='', max_length=255)),
('description', models.TextField(default='')),
('dialect', models.CharField(db_index=True, help_text='Type of compute, e.g. hive, impala... ', max_length=32)),
('interface', models.CharField(db_index=True, default='sqlalchemy', help_text='Type of interface, e.g. sqlalchemy, hiveserver2... ', max_length=32)),
('is_ready', models.BooleanField(default=True)),
('external_id', models.CharField(db_index=True, max_length=255, null=True)),
('ldap_groups_json', models.TextField(default='[]')),
('settings', models.TextField(default='{}')),
('last_modified', models.DateTimeField(auto_now=True, db_index=True, verbose_name='Time last modified')),
('namespace', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='beeswax.namespace')),
],
options={
'verbose_name': 'compute',
'verbose_name_plural': 'computes',
'unique_together': {('name',)},
},
),
]
106 changes: 105 additions & 1 deletion apps/beeswax/src/beeswax/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from desktop.models import Document, Document2
from desktop.redaction import global_redaction_engine
from librdbms.server import dbms as librdbms_dbms
from useradmin.models import User
from useradmin.models import User, UserProfile

from beeswax.design import HQLdesign

Expand Down Expand Up @@ -607,3 +607,107 @@ def get():
return MetaInstall.objects.get(id=1)
except MetaInstall.DoesNotExist:
return MetaInstall(id=1)

class Namespace(models.Model):
name = models.CharField(default='', max_length=255)
description = models.TextField(default='')
dialect = models.CharField(max_length=32, db_index=True, help_text=_t('Type of namespace, e.g. hive, mysql... '))
interface = models.CharField(
max_length=32,
db_index=True,
help_text=_t('Type of interface, e.g. sqlalchemy, hiveserver2... '),
default='sqlalchemy'
)
external_id = models.CharField(max_length=255, null=True, db_index=True)
last_modified = models.DateTimeField(auto_now=True, db_index=True, verbose_name=_t('Time last modified'))

class Meta:
verbose_name = _t('namespace')
verbose_name_plural = _t('namespaces')
unique_together = ('name',)

def get_computes(self, user):
"""Returns the computes belonging to the current namespace that are available to the current user."""
if user is None:
return []
profile = UserProfile.objects.get(user=user)
user_groups = set(profile.data.get("saml_attributes", {}).get("groups", []))

computes = Compute.objects.filter(namespace=self)
computes = [co.to_dict() for co in computes if not co.ldap_groups or co.ldap_groups.intersection(user_groups)]
computes.sort(key=lambda c: (not c['is_ready'], c['name']))
return computes

def __str__(self):
return '%s (%s)' % (self.name, self.dialect)

def to_dict(self):
return {
'id': self.id,
'type': str(self.id),
'name': self.name,
'description': self.description,
'dialect': self.dialect,
'interface': self.interface,
'external_id': self.external_id,
'last_modified': self.last_modified
}

class Compute(models.Model):
"""
Instance of a compute type pointing to a Hive or Impala compute resources.
"""
name = models.CharField(default='', max_length=255)
description = models.TextField(default='')
dialect = models.CharField(max_length=32, db_index=True, help_text=_t('Type of compute, e.g. hive, impala... '))
interface = models.CharField(
max_length=32,
db_index=True,
help_text=_t('Type of interface, e.g. sqlalchemy, hiveserver2... '),
default='sqlalchemy'
)
namespace = models.ForeignKey(Namespace, on_delete=models.CASCADE, null=True)
is_ready = models.BooleanField(default=True)
external_id = models.CharField(max_length=255, null=True, db_index=True)
ldap_groups_json = models.TextField(default='[]')
settings = models.TextField(default='{}')
last_modified = models.DateTimeField(auto_now=True, db_index=True, verbose_name=_t('Time last modified'))

class Meta:
verbose_name = _t('compute')
verbose_name_plural = _t('computes')
unique_together = ('name',)

def __str__(self):
return '%s (%s)' % (self.name, self.dialect)

def to_dict(self):
return {
'id': self.id,
'type': self.dialect + '-compute',
'name': self.name,
'namespace': self.namespace.name,
'description': self.description,
'dialect': self.dialect,
'interface': self.interface,
'is_ready': self.is_ready,
'options': self.options,
'external_id': self.external_id,
'last_modified': self.last_modified
}

@property
def ldap_groups(self):
if not self.ldap_groups_json:
self.ldap_groups_json = json.dumps([])
return set(json.loads(self.ldap_groups_json))

@ldap_groups.setter
def ldap_groups(self, val):
self.ldap_groups_json = json.dumps(list(val or []))

@property
def options(self):
if not self.settings:
self.settings = json.dumps([])
return {setting['name']: setting['value'] for setting in json.loads(self.settings)}
39 changes: 21 additions & 18 deletions apps/beeswax/src/beeswax/server/dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from django.urls import reverse
from kazoo.client import KazooClient

from beeswax.models import Compute
from desktop.conf import CLUSTER_ID, has_connectors
from desktop.lib.django_util import format_preserving_redirect
from desktop.lib.exceptions_renderable import PopupException
Expand Down Expand Up @@ -142,8 +143,9 @@ def get(user, query_server=None, cluster=None):


def get_query_server_config(name='beeswax', connector=None):
if connector and has_connectors(): # TODO: Give empty connector when no connector in use
LOG.debug("Query via connector %s" % name)
if connector and (has_connectors() or connector.get('compute')
or connector.get('type') in ('hive-compute', 'impala-compute')):
LOG.debug("Query via connector %s (%s)" % (name, connector.get('type')))
query_server = get_query_server_config_via_connector(connector)
else:
LOG.debug("Query via ini %s" % name)
Expand Down Expand Up @@ -306,39 +308,40 @@ def get_query_server_config(name='beeswax', connector=None):

def get_query_server_config_via_connector(connector):
# TODO: connector is actually a notebook interpreter
connector_name = full_connector_name = connector['type']
compute_name = None
if connector.get('compute'):
compute_name = connector['compute']['name']
full_connector_name = '%s-%s' % (connector_name, compute_name)
LOG.debug("Query cluster connector %s compute %s" % (connector_name, compute_name))

if connector['options'].get('has_ssh') == 'true':
compute = connector.get('compute', connector)
connector_name = connector['type']
compute_name = compute['name']
if compute.get('id'):
compute = Compute.objects.get(id=compute['id']).to_dict() #Reload the full compute from db
LOG.debug("Query cluster connector %s compute %s" % (connector_name, compute))

if compute['options'].get('has_ssh') == 'true':
server_host = '127.0.0.1'
server_port = connector['options']['server_port']
else:
server_host = (connector['compute']['options'] if 'compute' in connector else connector['options'])['server_host']
server_port = int((connector['compute']['options'] if 'compute' in connector else connector['options'])['server_port'])
server_host = compute['options']['server_host']
server_port = int(compute['options']['server_port'])

if 'impersonation_enabled' in connector['options']:
impersonation_enabled = connector['options']['impersonation_enabled'] == 'true'
if 'impersonation_enabled' in compute['options']:
impersonation_enabled = bool(compute['options']['impersonation_enabled'])
else:
impersonation_enabled = hiveserver2_impersonation_enabled()

return {
'dialect': connector['dialect'],
'server_name': full_connector_name,
'dialect': compute['dialect'],
'server_name': compute_name,
'server_host': server_host,
'server_port': server_port,
'principal': 'TODO',
'auth_username': AUTH_USERNAME.get(),
'auth_password': AUTH_PASSWORD.get(),

'impersonation_enabled': impersonation_enabled,
'use_sasl': connector['options'].get('use_sasl', 'true') == 'true',
'use_sasl': str(compute['options'].get('use_sasl', True)).upper() == 'TRUE',
'SESSION_TIMEOUT_S': 15 * 60,
'querycache_rows': 1000,
'QUERY_TIMEOUT_S': 15 * 60,
'transport_mode': compute['options'].get('transport_mode', 'http'),
'http_url': compute['options'].get('http_url', 'http://%s:%s/cliservice' % (server_host, server_port)),
}


Expand Down
7 changes: 5 additions & 2 deletions apps/beeswax/src/beeswax/server/hive_server2_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,10 @@ def open_session(self, user):
'configuration': {},
}
connector_type = 'hive' if self.query_server['server_name'] == 'beeswax' else self.query_server['server_name']
interpreter = get_interpreter(connector_type=connector_type, user=self.user)
interpreter_dialect = self.query_server['dialect']
if not interpreter_dialect:
interpreter = get_interpreter(connector_type=connector_type, user=self.user)
interpreter_dialect = interpreter.get('dialect')

if self.impersonation_enabled:
kwargs.update({'username': DEFAULT_USER})
Expand All @@ -684,7 +687,7 @@ def open_session(self, user):
if csrf_header and ENABLE_X_CSRF_TOKEN_FOR_HIVE_IMPALA.get():
kwargs['configuration'].update({'X-CSRF-TOKEN': csrf_header})

if self.query_server['server_name'] == 'hplsql' or interpreter['dialect'] == 'hplsql': # All the time
if self.query_server['server_name'] == 'hplsql' or interpreter_dialect == 'hplsql': # All the time
kwargs['configuration'].update({'hive.server2.proxy.user': user.username, 'set:hivevar:mode': 'HPLSQL'})

if self.query_server['server_name'] == 'llap': # All the time
Expand Down
2 changes: 2 additions & 0 deletions apps/beeswax/src/beeswax/server/hive_server2_lib_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def setUp(self):
'use_sasl': True,
'server_host': 'localhost',
'server_port': 10000,
'dialect': 'hive',
'interface': 'hiveserver2'
}

def test_open_session(self):
Expand Down
4 changes: 3 additions & 1 deletion apps/impala/src/impala/dbms_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ def setUp(self):

def test_get_connector_config(self):
connector = {
'type': 'impala-1',
'type': 'impala-compute',
'name': 'impala-1',
'dialect': 'impala',
'interface': 'hiveserver2',
'options': {'server_host': 'gethue.com', 'server_port': 10000}
}

Expand Down
12 changes: 8 additions & 4 deletions apps/jobbrowser/src/jobbrowser/apis/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@
LOG = logging.getLogger()

try:
from beeswax.models import Session
from beeswax.models import Session, Compute
from impala.server import get_api as get_impalad_api, _get_impala_server_url
except ImportError as e:
LOG.exception('Some application are not enabled: %s' % e)


def _get_api(user, cluster=None):
if cluster and cluster.get('type') == 'altus-dw':
server_url = 'http://impala-coordinator-%(name)s:25000' % cluster
compute = cluster['compute'] if cluster.get('compute') else cluster
if compute and compute.get('type') == 'impala-compute':
if compute.get('id') and not (compute.get('options') and compute['options'].get('http_url')):
compute = Compute.objects.get(id=compute['id']).to_dict() # Reload the full compute from db
if compute.get('options') and compute['options'].get('api_url'):
server_url = compute['options'].get('api_url')
else:
# TODO: multi computes if snippet.get('compute') or snippet['type'] has computes
application = cluster.get('interface', 'impala')
application = cluster['compute']['type'] if cluster.get('compute') else cluster.get('interface', 'impala')
session = Session.objects.get_session(user, application=application)
server_url = _get_impala_server_url(session)
return get_impalad_api(user=user, url=server_url)
Expand Down
Loading

0 comments on commit 3594a73

Please sign in to comment.