Skip to content

Commit

Permalink
[core] Moved namespaces and computes to database
Browse files Browse the repository at this point in the history
The existing implementation was connecting to external Altus and DWX
api's to fetch the available computes. This commit changes that by
moving the cluster config to database. Two new tables have been added
(a) `beeswax_namespace` to hold config for a namespace/dialect
(b) `beeswax_compute` to hold configs for individual compute clusters
linked to the namespaces.
This change currently support hive and impala clusters.

There is also a service-discovery component that keeps the list of
namespaces and computes updated in the corresponding tables.
`sync_warehouses.py` performs the service discovery in the CDW environ
by talking to kubernetes api's. It creates one Hive and one Impala
namespace and one compute for each virtual warehouse. The command is
supposed to run every minute and keep the list of warehouses updated.

There are related changes and fixes to the rest of the code to support
the query execution on different computes.

Change-Id: Ifd8dbc8d716dfe2000fbfa8121e39f2610051fa1
  • Loading branch information
amitsrivastava committed Sep 14, 2023
1 parent e0738cd commit 074dae5
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 103 deletions.
52 changes: 52 additions & 0 deletions apps/beeswax/src/beeswax/migrations/0003_compute_namespace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Generated by Django 3.2.16 on 2023-09-07 14:30

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('beeswax', '0002_auto_20200320_0746'),
]

operations = [
migrations.CreateModel(
name='Namespace',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(default='', max_length=255)),
('description', models.TextField(default='')),
('dialect', models.CharField(db_index=True, help_text='Type of namespace, e.g. hive, mysql... ', max_length=32)),
('interface', models.CharField(db_index=True, default='sqlalchemy', help_text='Type of interface, e.g. sqlalchemy, hiveserver2... ', max_length=32)),
('external_id', models.CharField(db_index=True, max_length=255, null=True)),
('last_modified', models.DateTimeField(auto_now=True, db_index=True, verbose_name='Time last modified')),
],
options={
'verbose_name': 'namespace',
'verbose_name_plural': 'namespaces',
'unique_together': {('name',)},
},
),
migrations.CreateModel(
name='Compute',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(default='', max_length=255)),
('description', models.TextField(default='')),
('dialect', models.CharField(db_index=True, help_text='Type of compute, e.g. hive, impala... ', max_length=32)),
('interface', models.CharField(db_index=True, default='sqlalchemy', help_text='Type of interface, e.g. sqlalchemy, hiveserver2... ', max_length=32)),
('is_ready', models.BooleanField(default=True)),
('external_id', models.CharField(db_index=True, max_length=255, null=True)),
('ldap_groups_json', models.TextField(default='[]')),
('settings', models.TextField(default='{}')),
('last_modified', models.DateTimeField(auto_now=True, db_index=True, verbose_name='Time last modified')),
('namespace', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='beeswax.namespace')),
],
options={
'verbose_name': 'compute',
'verbose_name_plural': 'computes',
'unique_together': {('name',)},
},
),
]
106 changes: 105 additions & 1 deletion apps/beeswax/src/beeswax/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from desktop.models import Document, Document2
from desktop.redaction import global_redaction_engine
from librdbms.server import dbms as librdbms_dbms
from useradmin.models import User
from useradmin.models import User, UserProfile

from beeswax.design import HQLdesign

Expand Down Expand Up @@ -607,3 +607,107 @@ def get():
return MetaInstall.objects.get(id=1)
except MetaInstall.DoesNotExist:
return MetaInstall(id=1)

class Namespace(models.Model):
name = models.CharField(default='', max_length=255)
description = models.TextField(default='')
dialect = models.CharField(max_length=32, db_index=True, help_text=_t('Type of namespace, e.g. hive, mysql... '))
interface = models.CharField(
max_length=32,
db_index=True,
help_text=_t('Type of interface, e.g. sqlalchemy, hiveserver2... '),
default='sqlalchemy'
)
external_id = models.CharField(max_length=255, null=True, db_index=True)
last_modified = models.DateTimeField(auto_now=True, db_index=True, verbose_name=_t('Time last modified'))

class Meta:
verbose_name = _t('namespace')
verbose_name_plural = _t('namespaces')
unique_together = ('name',)

def get_computes(self, user):
"""Returns the computes belonging to the current namespace that are available to the current user."""
if user is None:
return []
profile = UserProfile.objects.get(user=user)
user_groups = set(profile.data.get("saml_attributes", {}).get("groups", []))

computes = Compute.objects.filter(namespace=self)
computes = [co.to_dict() for co in computes if not co.ldap_groups or co.ldap_groups.intersection(user_groups)]
computes.sort(key = lambda c: (not c['is_ready'], c['name']))
return computes

def __str__(self):
return '%s (%s)' % (self.name, self.dialect)

def to_dict(self):
return {
'id': self.id,
'type': str(self.id),
'name': self.name,
'description': self.description,
'dialect': self.dialect,
'interface': self.interface,
'external_id': self.external_id,
'last_modified': self.last_modified
}

class Compute(models.Model):
"""
Instance of a compute type pointing to a Hive or Impala compute resources.
"""
name = models.CharField(default='', max_length=255)
description = models.TextField(default='')
dialect = models.CharField(max_length=32, db_index=True, help_text=_t('Type of compute, e.g. hive, impala... '))
interface = models.CharField(
max_length=32,
db_index=True,
help_text=_t('Type of interface, e.g. sqlalchemy, hiveserver2... '),
default='sqlalchemy'
)
namespace = models.ForeignKey(Namespace, on_delete=models.CASCADE, null=True)
is_ready = models.BooleanField(default=True)
external_id = models.CharField(max_length=255, null=True, db_index=True)
ldap_groups_json = models.TextField(default='[]')
settings = models.TextField(default='{}')
last_modified = models.DateTimeField(auto_now=True, db_index=True, verbose_name=_t('Time last modified'))

class Meta:
verbose_name = _t('compute')
verbose_name_plural = _t('computes')
unique_together = ('name',)

def __str__(self):
return '%s (%s)' % (self.name, self.dialect)

def to_dict(self):
return {
'id': self.id,
'type': self.dialect + '-compute',
'name': self.name,
'namespace': self.namespace.name,
'description': self.description,
'dialect': self.dialect,
'interface': self.interface,
'is_ready': self.is_ready,
'options': self.options,
'external_id': self.external_id,
'last_modified': self.last_modified
}

@property
def ldap_groups(self):
if not self.ldap_groups_json:
self.ldap_groups_json = json.dumps([])
return set(json.loads(self.ldap_groups_json))

@ldap_groups.setter
def ldap_groups(self, val):
self.ldap_groups_json = json.dumps(list(val or []))

@property
def options(self):
if not self.settings:
self.settings = json.dumps([])
return {setting['name']: setting['value'] for setting in json.loads(self.settings)}
31 changes: 15 additions & 16 deletions apps/beeswax/src/beeswax/server/dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def get(user, query_server=None, cluster=None):


def get_query_server_config(name='beeswax', connector=None):
if connector and has_connectors(): # TODO: Give empty connector when no connector in use
LOG.debug("Query via connector %s" % name)
if connector and (has_connectors() or connector.get('compute') or connector.get('type') in ('hive-compute', 'impala-compute')): # TODO: Give empty connector when no connector in use
LOG.debug("Query via connector %s (%s)" % (name, connector.get('type')))
query_server = get_query_server_config_via_connector(connector)
else:
LOG.debug("Query via ini %s" % name)
Expand Down Expand Up @@ -306,39 +306,38 @@ def get_query_server_config(name='beeswax', connector=None):

def get_query_server_config_via_connector(connector):
# TODO: connector is actually a notebook interpreter
connector_name = full_connector_name = connector['type']
compute_name = None
if connector.get('compute'):
compute_name = connector['compute']['name']
full_connector_name = '%s-%s' % (connector_name, compute_name)
compute = connector.get('compute', connector)
connector_name = connector['type']
compute_name = compute['name']
LOG.debug("Query cluster connector %s compute %s" % (connector_name, compute_name))

if connector['options'].get('has_ssh') == 'true':
if compute['options'].get('has_ssh') == 'true':
server_host = '127.0.0.1'
server_port = connector['options']['server_port']
else:
server_host = (connector['compute']['options'] if 'compute' in connector else connector['options'])['server_host']
server_port = int((connector['compute']['options'] if 'compute' in connector else connector['options'])['server_port'])
server_host = compute['options']['server_host']
server_port = int(compute['options']['server_port'])

if 'impersonation_enabled' in connector['options']:
impersonation_enabled = connector['options']['impersonation_enabled'] == 'true'
if 'impersonation_enabled' in compute['options']:
impersonation_enabled = bool(compute['options']['impersonation_enabled'])
else:
impersonation_enabled = hiveserver2_impersonation_enabled()

return {
'dialect': connector['dialect'],
'server_name': full_connector_name,
'dialect': compute['dialect'],
'server_name': compute_name,
'server_host': server_host,
'server_port': server_port,
'principal': 'TODO',
'auth_username': AUTH_USERNAME.get(),
'auth_password': AUTH_PASSWORD.get(),

'impersonation_enabled': impersonation_enabled,
'use_sasl': connector['options'].get('use_sasl', 'true') == 'true',
'use_sasl': str(compute['options'].get('use_sasl', True)).upper() == 'TRUE',
'SESSION_TIMEOUT_S': 15 * 60,
'querycache_rows': 1000,
'QUERY_TIMEOUT_S': 15 * 60,
'transport_mode': compute['options'].get('transport_mode', 'http'),
'http_url': compute['options'].get('http_url', 'http://%s:%s/cliservice' % (server_host, server_port)),
}


Expand Down
7 changes: 5 additions & 2 deletions apps/beeswax/src/beeswax/server/hive_server2_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,10 @@ def open_session(self, user):
'configuration': {},
}
connector_type = 'hive' if self.query_server['server_name'] == 'beeswax' else self.query_server['server_name']
interpreter = get_interpreter(connector_type=connector_type, user=self.user)
interpreter_dialect = self.query_server['dialect']
if not interpreter_dialect:
interpreter = get_interpreter(connector_type=connector_type, user=self.user)
interpreter_dialect = interpreter.get('dialect')

if self.impersonation_enabled:
kwargs.update({'username': DEFAULT_USER})
Expand All @@ -684,7 +687,7 @@ def open_session(self, user):
if csrf_header and ENABLE_X_CSRF_TOKEN_FOR_HIVE_IMPALA.get():
kwargs['configuration'].update({'X-CSRF-TOKEN': csrf_header})

if self.query_server['server_name'] == 'hplsql' or interpreter['dialect'] == 'hplsql': # All the time
if self.query_server['server_name'] == 'hplsql' or interpreter_dialect == 'hplsql': # All the time
kwargs['configuration'].update({'hive.server2.proxy.user': user.username, 'set:hivevar:mode': 'HPLSQL'})

if self.query_server['server_name'] == 'llap': # All the time
Expand Down
2 changes: 1 addition & 1 deletion apps/jobbrowser/src/jobbrowser/apis/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _get_api(user, cluster=None):
server_url = 'http://impala-coordinator-%(name)s:25000' % cluster
else:
# TODO: multi computes if snippet.get('compute') or snippet['type'] has computes
application = cluster.get('interface', 'impala')
application = cluster['compute']['type'] if cluster.get('compute') else cluster.get('interface', 'impala')
session = Session.objects.get_session(user, application=application)
server_url = _get_impala_server_url(session)
return get_impalad_api(user=user, url=server_url)
Expand Down
1 change: 1 addition & 0 deletions desktop/core/base_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jaeger-client==4.3.0
jdcal==1.0.1
kazoo==2.8.0
kerberos==1.3.0
kubernetes==26.1.0
lockfile==0.12.2
Mako==1.2.3
Markdown==3.1
Expand Down
Loading

0 comments on commit 074dae5

Please sign in to comment.