Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Moved namespaces and computes to database #3446

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions apps/beeswax/src/beeswax/migrations/0003_compute_namespace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Generated by Django 3.2.16 on 2023-09-07 14:30

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('beeswax', '0002_auto_20200320_0746'),
]

operations = [
migrations.CreateModel(
name='Namespace',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(default='', max_length=255)),
('description', models.TextField(default='')),
('dialect', models.CharField(db_index=True, help_text='Type of namespace, e.g. hive, mysql... ', max_length=32)),
('interface', models.CharField(db_index=True, default='sqlalchemy', help_text='Type of interface, e.g. sqlalchemy, hiveserver2... ', max_length=32)),
('external_id', models.CharField(db_index=True, max_length=255, null=True)),
('last_modified', models.DateTimeField(auto_now=True, db_index=True, verbose_name='Time last modified')),
],
options={
'verbose_name': 'namespace',
'verbose_name_plural': 'namespaces',
'unique_together': {('name',)},
},
),
migrations.CreateModel(
name='Compute',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(default='', max_length=255)),
('description', models.TextField(default='')),
('dialect', models.CharField(db_index=True, help_text='Type of compute, e.g. hive, impala... ', max_length=32)),
('interface', models.CharField(db_index=True, default='sqlalchemy', help_text='Type of interface, e.g. sqlalchemy, hiveserver2... ', max_length=32)),
('is_ready', models.BooleanField(default=True)),
('external_id', models.CharField(db_index=True, max_length=255, null=True)),
('ldap_groups_json', models.TextField(default='[]')),
('settings', models.TextField(default='{}')),
('last_modified', models.DateTimeField(auto_now=True, db_index=True, verbose_name='Time last modified')),
('namespace', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='beeswax.namespace')),
],
options={
'verbose_name': 'compute',
'verbose_name_plural': 'computes',
'unique_together': {('name',)},
},
),
]
106 changes: 105 additions & 1 deletion apps/beeswax/src/beeswax/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from desktop.models import Document, Document2
from desktop.redaction import global_redaction_engine
from librdbms.server import dbms as librdbms_dbms
from useradmin.models import User
from useradmin.models import User, UserProfile

from beeswax.design import HQLdesign

Expand Down Expand Up @@ -607,3 +607,107 @@ def get():
return MetaInstall.objects.get(id=1)
except MetaInstall.DoesNotExist:
return MetaInstall(id=1)

class Namespace(models.Model):
name = models.CharField(default='', max_length=255)
description = models.TextField(default='')
dialect = models.CharField(max_length=32, db_index=True, help_text=_t('Type of namespace, e.g. hive, mysql... '))
interface = models.CharField(
max_length=32,
db_index=True,
help_text=_t('Type of interface, e.g. sqlalchemy, hiveserver2... '),
default='sqlalchemy'
)
external_id = models.CharField(max_length=255, null=True, db_index=True)
last_modified = models.DateTimeField(auto_now=True, db_index=True, verbose_name=_t('Time last modified'))

class Meta:
verbose_name = _t('namespace')
verbose_name_plural = _t('namespaces')
unique_together = ('name',)

def get_computes(self, user):
"""Returns the computes belonging to the current namespace that are available to the current user."""
if user is None:
return []
profile = UserProfile.objects.get(user=user)
user_groups = set(profile.data.get("saml_attributes", {}).get("groups", []))

computes = Compute.objects.filter(namespace=self)
computes = [co.to_dict() for co in computes if not co.ldap_groups or co.ldap_groups.intersection(user_groups)]
computes.sort(key=lambda c: (not c['is_ready'], c['name']))
return computes

def __str__(self):
return '%s (%s)' % (self.name, self.dialect)

def to_dict(self):
return {
'id': self.id,
'type': str(self.id),
'name': self.name,
'description': self.description,
'dialect': self.dialect,
'interface': self.interface,
'external_id': self.external_id,
'last_modified': self.last_modified
}

class Compute(models.Model):
"""
Instance of a compute type pointing to a Hive or Impala compute resources.
"""
name = models.CharField(default='', max_length=255)
description = models.TextField(default='')
dialect = models.CharField(max_length=32, db_index=True, help_text=_t('Type of compute, e.g. hive, impala... '))
interface = models.CharField(
max_length=32,
db_index=True,
help_text=_t('Type of interface, e.g. sqlalchemy, hiveserver2... '),
default='sqlalchemy'
)
namespace = models.ForeignKey(Namespace, on_delete=models.CASCADE, null=True)
is_ready = models.BooleanField(default=True)
external_id = models.CharField(max_length=255, null=True, db_index=True)
ldap_groups_json = models.TextField(default='[]')
settings = models.TextField(default='{}')
last_modified = models.DateTimeField(auto_now=True, db_index=True, verbose_name=_t('Time last modified'))

class Meta:
verbose_name = _t('compute')
verbose_name_plural = _t('computes')
unique_together = ('name',)

def __str__(self):
return '%s (%s)' % (self.name, self.dialect)

def to_dict(self):
return {
'id': self.id,
'type': self.dialect + '-compute',
'name': self.name,
'namespace': self.namespace.name,
'description': self.description,
'dialect': self.dialect,
'interface': self.interface,
'is_ready': self.is_ready,
'options': self.options,
'external_id': self.external_id,
'last_modified': self.last_modified
}

@property
def ldap_groups(self):
if not self.ldap_groups_json:
self.ldap_groups_json = json.dumps([])
return set(json.loads(self.ldap_groups_json))

@ldap_groups.setter
def ldap_groups(self, val):
self.ldap_groups_json = json.dumps(list(val or []))

@property
def options(self):
if not self.settings:
self.settings = json.dumps([])
return {setting['name']: setting['value'] for setting in json.loads(self.settings)}
39 changes: 21 additions & 18 deletions apps/beeswax/src/beeswax/server/dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from django.urls import reverse
from kazoo.client import KazooClient

from beeswax.models import Compute
from desktop.conf import CLUSTER_ID, has_connectors
from desktop.lib.django_util import format_preserving_redirect
from desktop.lib.exceptions_renderable import PopupException
Expand Down Expand Up @@ -142,8 +143,9 @@ def get(user, query_server=None, cluster=None):


def get_query_server_config(name='beeswax', connector=None):
if connector and has_connectors(): # TODO: Give empty connector when no connector in use
LOG.debug("Query via connector %s" % name)
if connector and (has_connectors() or connector.get('compute')
or connector.get('type') in ('hive-compute', 'impala-compute')):
LOG.debug("Query via connector %s (%s)" % (name, connector.get('type')))
query_server = get_query_server_config_via_connector(connector)
else:
LOG.debug("Query via ini %s" % name)
Expand Down Expand Up @@ -306,39 +308,40 @@ def get_query_server_config(name='beeswax', connector=None):

def get_query_server_config_via_connector(connector):
amitsrivastava marked this conversation as resolved.
Show resolved Hide resolved
# TODO: connector is actually a notebook interpreter
connector_name = full_connector_name = connector['type']
compute_name = None
if connector.get('compute'):
compute_name = connector['compute']['name']
full_connector_name = '%s-%s' % (connector_name, compute_name)
LOG.debug("Query cluster connector %s compute %s" % (connector_name, compute_name))

if connector['options'].get('has_ssh') == 'true':
compute = connector.get('compute', connector)
amitsrivastava marked this conversation as resolved.
Show resolved Hide resolved
connector_name = connector['type']
compute_name = compute['name']
if compute.get('id'):
compute = Compute.objects.get(id=compute['id']).to_dict() #Reload the full compute from db
LOG.debug("Query cluster connector %s compute %s" % (connector_name, compute))

if compute['options'].get('has_ssh') == 'true':
server_host = '127.0.0.1'
server_port = connector['options']['server_port']
else:
server_host = (connector['compute']['options'] if 'compute' in connector else connector['options'])['server_host']
server_port = int((connector['compute']['options'] if 'compute' in connector else connector['options'])['server_port'])
server_host = compute['options']['server_host']
server_port = int(compute['options']['server_port'])

if 'impersonation_enabled' in connector['options']:
impersonation_enabled = connector['options']['impersonation_enabled'] == 'true'
if 'impersonation_enabled' in compute['options']:
impersonation_enabled = bool(compute['options']['impersonation_enabled'])
else:
impersonation_enabled = hiveserver2_impersonation_enabled()

return {
'dialect': connector['dialect'],
'server_name': full_connector_name,
'dialect': compute['dialect'],
'server_name': compute_name,
'server_host': server_host,
'server_port': server_port,
'principal': 'TODO',
'auth_username': AUTH_USERNAME.get(),
'auth_password': AUTH_PASSWORD.get(),

'impersonation_enabled': impersonation_enabled,
'use_sasl': connector['options'].get('use_sasl', 'true') == 'true',
'use_sasl': str(compute['options'].get('use_sasl', True)).upper() == 'TRUE',
'SESSION_TIMEOUT_S': 15 * 60,
'querycache_rows': 1000,
'QUERY_TIMEOUT_S': 15 * 60,
'transport_mode': compute['options'].get('transport_mode', 'http'),
'http_url': compute['options'].get('http_url', 'http://%s:%s/cliservice' % (server_host, server_port)),
}


Expand Down
7 changes: 5 additions & 2 deletions apps/beeswax/src/beeswax/server/hive_server2_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,10 @@ def open_session(self, user):
'configuration': {},
}
connector_type = 'hive' if self.query_server['server_name'] == 'beeswax' else self.query_server['server_name']
interpreter = get_interpreter(connector_type=connector_type, user=self.user)
interpreter_dialect = self.query_server['dialect']
if not interpreter_dialect:
interpreter = get_interpreter(connector_type=connector_type, user=self.user)
interpreter_dialect = interpreter.get('dialect')

if self.impersonation_enabled:
kwargs.update({'username': DEFAULT_USER})
Expand All @@ -684,7 +687,7 @@ def open_session(self, user):
if csrf_header and ENABLE_X_CSRF_TOKEN_FOR_HIVE_IMPALA.get():
kwargs['configuration'].update({'X-CSRF-TOKEN': csrf_header})

if self.query_server['server_name'] == 'hplsql' or interpreter['dialect'] == 'hplsql': # All the time
if self.query_server['server_name'] == 'hplsql' or interpreter_dialect == 'hplsql': # All the time
kwargs['configuration'].update({'hive.server2.proxy.user': user.username, 'set:hivevar:mode': 'HPLSQL'})

if self.query_server['server_name'] == 'llap': # All the time
Expand Down
2 changes: 2 additions & 0 deletions apps/beeswax/src/beeswax/server/hive_server2_lib_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def setUp(self):
'use_sasl': True,
'server_host': 'localhost',
'server_port': 10000,
'dialect': 'hive',
'interface': 'hiveserver2'
}

def test_open_session(self):
Expand Down
4 changes: 3 additions & 1 deletion apps/impala/src/impala/dbms_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ def setUp(self):

def test_get_connector_config(self):
connector = {
'type': 'impala-1',
'type': 'impala-compute',
'name': 'impala-1',
'dialect': 'impala',
'interface': 'hiveserver2',
'options': {'server_host': 'gethue.com', 'server_port': 10000}
}

Expand Down
12 changes: 8 additions & 4 deletions apps/jobbrowser/src/jobbrowser/apis/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@
LOG = logging.getLogger()

try:
from beeswax.models import Session
from beeswax.models import Session, Compute
from impala.server import get_api as get_impalad_api, _get_impala_server_url
except ImportError as e:
LOG.exception('Some application are not enabled: %s' % e)


def _get_api(user, cluster=None):
if cluster and cluster.get('type') == 'altus-dw':
server_url = 'http://impala-coordinator-%(name)s:25000' % cluster
compute = cluster['compute'] if cluster.get('compute') else cluster
if compute and compute.get('type') == 'impala-compute':
if compute.get('id') and not (compute.get('options') and compute['options'].get('http_url')):
compute = Compute.objects.get(id=compute['id']).to_dict() # Reload the full compute from db
if compute.get('options') and compute['options'].get('api_url'):
server_url = compute['options'].get('api_url')
else:
# TODO: multi computes if snippet.get('compute') or snippet['type'] has computes
application = cluster.get('interface', 'impala')
application = cluster['compute']['type'] if cluster.get('compute') else cluster.get('interface', 'impala')
session = Session.objects.get_session(user, application=application)
server_url = _get_impala_server_url(session)
return get_impalad_api(user=user, url=server_url)
Expand Down
Loading
Loading