Skip to content

Commit

Permalink
Post Connect 22 consolidation
Browse files Browse the repository at this point in the history
Data Connect mapping examples
FHIR examples from NIH ODSS FHIR training
DRS issue illustration
Work using IDC tables for bundle alternate illustration
  • Loading branch information
ianfore committed Jun 12, 2022
1 parent 2c4a6b3 commit 914420b
Show file tree
Hide file tree
Showing 26 changed files with 45,700 additions and 36,942 deletions.
3 changes: 2 additions & 1 deletion fasp/loc/sbdrsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def getObject(self, object_id):
resp = response.content.decode('utf-8')
return json.loads(resp)
else:
return response.status_code
print (response.raise_for_status())
return None

def getHeaders(self):
return {'X-SBG-Auth-Token' : self.access_token }
Expand Down
126 changes: 126 additions & 0 deletions fasp/search/MappingLibrary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from google.cloud import bigquery
import pandas as pd
import json

class MappingLibraryClient:

def __init__(self ):
self.bqclient = bigquery.Client()

self.dataset_ref = self.bqclient.dataset('metadata')
table_ref = self.dataset_ref.table('mapping')
self.map_table = self.bqclient.get_table(table_ref)


def getNewMapId(self):

query = "SELECT max(map_id) max_id FROM `isbcgc-216220.metadata.mapping` m "
query_job = self.bqclient.query(query) # Send the query
queryResults = []
for row in query_job:
next_id = row.max_id + 1
return next_id



def getMappingsForVars(self, varList, returnType = None):

varString = ''
for varID in varList:
if len(varString) == 0:
varString += '(';
else:
varString += ','
varString += f"'{varID}'"
varString += ')';
query = f"""
SELECT map_id, m.map_type, m.from_scheme, m.to_scheme, m.to_vocab_id
FROM `isbcgc-216220.metadata.mapping` m
where m.from_scheme in {varString} """
query_job = self.bqclient.query(query) # Send the query
queryResults = []
for row in query_job:
queryResults.append({'map_id':row.map_id,
'type':row.map_type,
'from':row.from_scheme,
'to':row.to_scheme
})

if returnType == 'dataframe':
resList = []
for r in queryResults:
res = {}
for k, v in r.items():
res[k] = v
resList.append(res)
df = pd.DataFrame(resList)
print( df)
else:
return queryResults


def getMappingsForVar(self, varID, returnType = None):
query = f"""
SELECT map_id, m.map_type, m.to_scheme, m.to_vocab_id
FROM `isbcgc-216220.metadata.mapping` m
where m.from_scheme = '{varID}'"""
query_job = self.bqclient.query(query) # Send the query
queryResults = []
for row in query_job:
queryResults.append(row)

if returnType == 'dataframe':
resList = []
for r in queryResults:
res = {}
for k, v in r.items():
res[k] = v
resList.append(res)
df = pd.DataFrame(resList)
return df
else:
return queryResults

def getJsonMap(self, map_id):
query = f"""
SELECT valueString, mapto_value_id
FROM `isbcgc-216220.metadata.value_map`
where map_id = {map_id} """
query_job = self.bqclient.query(query) # Send the query
json_map = {}
for row in query_job:
json_map[int(row.valueString)] = row.mapto_value_id
return json_map

def addJsonMap(self, from_scheme, to_scheme, to_vocab_id, json_map):

map_id = self.getNewMapId()
map_row = [(map_id, from_scheme, to_scheme, to_vocab_id, u'', u'json_map', u'')]
errors = self.bqclient.insert_rows(self.map_table, map_row)
assert errors == []

item_table_ref = self.dataset_ref.table('value_map')
item_table = self.bqclient.get_table(item_table_ref)
value_rows = []
for k,v in json_map.items():
value_rows.append((map_id, k, v))
errors = self.bqclient.insert_rows(item_table, value_rows)
assert errors == []
return map_id


def addColnameMap(self, from_scheme, to_scheme):

map_id = self.getNewMapId()
map_row = [(map_id, from_scheme, to_scheme, u'', u'', u'colname', u'')]
errors = self.bqclient.insert_rows(self.map_table, map_row)
assert errors == []

return map_id


if __name__ == "__main__":
myClient = MappingLibraryClient()

res = myClient.getMappingsForVar('phv00159572.v4')
print (res)
76 changes: 70 additions & 6 deletions fasp/search/data_connect_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json

from fasp.loc import GA4GHRegistryClient
from fasp.search.MappingLibrary import MappingLibraryClient
import pandas as pd

class DataConnectClient:
Expand Down Expand Up @@ -45,7 +46,7 @@ def listTables(self, requestedCatalog=None, verbose=True):
next_url = "{}{}{}".format(self.hostURL,'/tables/catalog/',requestedCatalog)
pageCount = 0
if verbose:
print("_Retrieving the table list_")
print("Retrieving the table list")
while next_url != None :
pageCount += 1
if verbose:
Expand All @@ -67,7 +68,7 @@ def listTables(self, requestedCatalog=None, verbose=True):
def listCatalogs(self):
url = self.hostURL + "/tables"

print ("_Retrieving the catalog list_")
print ("Retrieving the catalog list")
response = requests.get(url, headers=self.headers)
result = (response.json())
for t in result['index']:
Expand All @@ -76,7 +77,7 @@ def listCatalogs(self):


def listCatalog(self, catalog):
self.listTables(catalog)
return self.listTables(catalog)

def listTableInfo(self, table, verbose=False):
url = "{}/table/{}/info".format(self.hostURL,table)
Expand Down Expand Up @@ -132,6 +133,49 @@ def getMappingTemplate(self, table, propList=None):
return template


def getDecodeTemplate(self, table, propList=None, numericCodes=True):
''' Get a template which maps enumerated codes to their decoded values
:param table: table for which to generate a mapping template
:param propList: optional list of properties to include in the map
:param numericCodes: return codes as integers - will fail if the codes are not
'''
schema = self.listTableInfo(table).schema
template = {}
for prop, details in schema['data_model']['properties'].items():
if propList == None or prop in propList:
if 'oneOf' in details:
vList = {}
for v in details['oneOf']:
if numericCodes:
vList[int(v['const'])] = v['title']
else:
vList[v['const']] = v['title']
#if titles:
# vList[v['const']]['title'] = v['title']
template[prop] = vList
return template

def getMappingsForTable(self, table):
modl = self.listTableInfo(table)
#varlist = []
props = modl.schema['data_model']['properties']
vLookUp = {}
for p, v in props.items():
vLookUp[v['$id']] = p
if self.debug:
print(vLookUp)
#Find the mappings
mcl = MappingLibraryClient()
varList = mcl.getMappingsForVars(list(vLookUp.keys()))
if self.debug:
print(varList)
# Add column names to the mappings
vi = 0
for var in varList:
varList[vi]['fromCol'] = vLookUp[var['from']]
vi += 1
return varList

def runOneTableQuery(self, column_list, table, limit):
col_string = ", ".join(column_list)

Expand All @@ -140,7 +184,20 @@ def runOneTableQuery(self, column_list, table, limit):
res = self.runQuery(query, returnType='dataframe')
return res

def runQuery(self, query, returnType=None):
def getDataFrameFromTable(self, table, column_list=[], limit=1000):
if isinstance(column_list, list):
if len(column_list) == 0:
column_list = '*'
else:
column_list.join(',')
query = f"select {column_list} from {table} limit {limit}"
print (query)
res = self.runQuery(query, returnType='dataframe')
if res.shape[0] >= limit:
print(f'The number of rows was limited to {limit}. Try setting limit=your_value if you need more data')
return res

def runQuery(self, query, returnType=None, progessIndicator=None):

query = query.replace("\n", " ").replace("\t", " ")
query2 = "{\"query\":\"%s\"}" % query
Expand All @@ -150,10 +207,14 @@ def runQuery(self, query, returnType=None):
pageCount = 0
resultRows = []
column_list = []
print ("_Retrieving the query_")
if not progessIndicator:
print ("Retrieving the query")
while next_url != None :
pageCount += 1
print ("____Page{}_______________".format(pageCount))
if progessIndicator:
progessIndicator.value += 1
else:
print ("____Page{}_______________".format(pageCount))
if pageCount == 1:
response = requests.request("POST", next_url,
headers=self.headers, data = query2)
Expand All @@ -177,6 +238,9 @@ def runQuery(self, query, returnType=None):
if self.debug: print('found data model')
column_list = result['data_model']['properties'].keys()

if progessIndicator:
progessIndicator.value = progessIndicator.max

if returnType == 'dataframe':
df = pd.DataFrame(resultRows, columns=column_list, index=None)
return df
Expand Down
Loading

0 comments on commit 914420b

Please sign in to comment.