Skip to content

Commit

Permalink
Merge pull request #4 from ContinuumIO/query
Browse files Browse the repository at this point in the history
allow queries in get_table
  • Loading branch information
AlbertDeFusco authored Oct 28, 2021
2 parents 49e295a + 4ff6eec commit c28d36e
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions intake_metabase/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class MetabaseCatalog(Catalog):
version = __version__
# partition_access = False

def __init__(self, domain, username=None, password=None, token=None, metadata=None):
def __init__(self, domain, username=None, password=None, token=None, metadata=None, name=None):
self.name = name
self.domain = domain
self.username = username
self.password = password
Expand Down Expand Up @@ -94,7 +95,7 @@ def _get_schema(self):
self._df = self._metabase.get_card(self.question)

return Schema(datashape=None,
dtype=self._df.dtypes,
dtype={n: str(t) for (n, t) in self._df.dtypes.items()},
shape=(None, len(self._df.columns)),
npartitions=1,
extra_metadata={})
Expand All @@ -111,7 +112,7 @@ def to_dask(self):
raise NotImplementedError()

def _close(self):
self._dataframe = None
self._df = None


class MetabaseTableSource(DataSource):
Expand All @@ -120,13 +121,14 @@ class MetabaseTableSource(DataSource):
version = __version__
partition_access = True

def __init__(self, domain, database, table, username=None, password=None, token=None, metadata=None):
def __init__(self, domain, database, table=None, query=None, username=None, password=None, token=None, metadata=None):
self.domain = domain
self.username = username
self.password = password
self.database = database
self.token = token
self.table = table
self.query = query
self._df = None

self._metabase = MetabaseAPI(self.domain, self.username, self.password, self.token)
Expand All @@ -135,10 +137,10 @@ def __init__(self, domain, database, table, username=None, password=None, token=

def _get_schema(self):
if self._df is None:
self._df = self._metabase.get_table(self.database, self.table)
self._df = self._metabase.get_table(self.database, self.table, self.query)

return Schema(datashape=None,
dtype=self._df.dtypes,
dtype={n: str(t) for (n, t) in self._df.dtypes.items()},
shape=(None, len(self._df.columns)),
npartitions=1,
extra_metadata={})
Expand All @@ -155,7 +157,7 @@ def to_dask(self):
raise NotImplementedError()

def _close(self):
self._dataframe = None
self._df = None


class MetabaseAPI():
Expand Down Expand Up @@ -246,23 +248,38 @@ def get_card(self, question):
return pd.read_csv(StringIO(csv.encode(res.encoding).decode('utf-8')),
parse_dates=date_fields, infer_datetime_format=True)

def get_table(self, database, table):
def get_table(self, database, table=None, query=None):
from io import StringIO

import pandas as pd

self._create_or_refresh_token()

table_metadata = self.get_metadata(table)
date_fields = [f['display_name'] for f in table_metadata['fields']
if 'date' in f['base_type'].lower()]
if (table is not None) and (query is not None):
raise ValueError('Please set only one of table or query')

kwargs = {}
if table is not None:
table_metadata = self.get_metadata(table)
key = 'name' if query is not None else 'display_name'
date_fields = [f[key] for f in table_metadata['fields']
if 'date' in f['base_type'].lower()]
kwargs = {
'parse_dates': date_fields,
'infer_datetime_format': True
}

body = {
"database": database,
"query": {"source-table": table},
"type": "query",
}

if query is None:
body['type'] = 'query'
body['query'] = {'source-table': table}
else:
body['type'] = 'native'
body['native'] = {'query': query}

headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'X-Metabase-Session': self._token
Expand All @@ -278,4 +295,4 @@ def get_table(self, database, table):
csv = res.text

return pd.read_csv(StringIO(csv.encode(res.encoding).decode('utf-8')),
parse_dates=date_fields, infer_datetime_format=True)
**kwargs)

0 comments on commit c28d36e

Please sign in to comment.