Skip to content

Commit

Permalink
Merge pull request #284 from madgik/Task-#220CSV-to-DB-converter-to-b…
Browse files Browse the repository at this point in the history
…e-improved

Task #220 csv to db converter to be improved
  • Loading branch information
ThanKarab authored Oct 2, 2020
2 parents 02836ae + c8cf142 commit e47ce16
Showing 1 changed file with 93 additions and 72 deletions.
165 changes: 93 additions & 72 deletions Exareme-Docker/files/root/exareme/convert-csv-dataset-to-db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
This script creates multiple dbs for each pathology folder containing a dataset csv file and a metadata json file.
"""

import os
import sys
import csv
import sqlite3
import json
import os
import sqlite3
from argparse import ArgumentParser

MAX_ROWS_TO_INSERT_INTO_SQL = 100


# This metadata dictionary contains only code and sqltype so that processing will be faster
# It also includes the subjectcode
def createMetadataDictionary(CDEsMetadataPath):
Expand All @@ -22,20 +24,21 @@ def createMetadataDictionary(CDEsMetadataPath):
metadataDictionary['subjectcode'] = 'text'
metadataDictionary['dataset'] = 'text'
metadataDictionary = addGroupVariablesToDictionary(metadataJSON,
metadataDictionary)
metadataDictionary)
return metadataDictionary


def addGroupVariablesToDictionary(groupMetadata, metadataDictionary):
if 'variables' in groupMetadata:
for variable in groupMetadata['variables']:
if 'sql_type' not in variable:
raise ValueError('The variable "' + variable['code'] + '" does not contain the sql_type field in the metadata.')
raise ValueError(
'The variable "' + variable['code'] + '" does not contain the sql_type field in the metadata.')
metadataDictionary[variable['code']] = variable['sql_type']
if 'groups' in groupMetadata:
for group in groupMetadata['groups']:
metadataDictionary = addGroupVariablesToDictionary(group,
metadataDictionary)
metadataDictionary)
return metadataDictionary


Expand All @@ -54,46 +57,50 @@ def addGroupVariablesToList(groupMetadata, metadataList):
for variable in groupMetadata['variables']:
variableDictionary = {}
variableDictionary['code'] = variable['code']

if 'label' not in variable:
raise ValueError('The variable "' + variable['code'] + '" does not contain the label field in the metadata.')
raise ValueError(
'The variable "' + variable['code'] + '" does not contain the label field in the metadata.')
variableDictionary['label'] = variable['label']

if 'sql_type' not in variable:
raise ValueError('The variable "' + variable['code'] + '" does not contain the sql_type field in the metadata.')
raise ValueError(
'The variable "' + variable['code'] + '" does not contain the sql_type field in the metadata.')
variableDictionary['sql_type'] = variable['sql_type']

if 'isCategorical' not in variable:
raise ValueError('The variable "' + variable['code'] + '" does not contain the isCategorical field in the metadata.')
raise ValueError(
'The variable "' + variable['code'] + '" does not contain the isCategorical field in the metadata.')
variableDictionary['isCategorical'] = '1' if variable['isCategorical'] else '0'

if variable['isCategorical'] and 'enumerations' not in variable:
raise ValueError('The variable "' + variable['code'] + '" does not contain enumerations even though it is categorical.')

if 'enumerations' in variable:
raise ValueError('The variable "' + variable[
'code'] + '" does not contain enumerations even though it is categorical.')

if 'enumerations' in variable:
enumerations = []
for enumeration in variable['enumerations']:
enumerations.append(unicode(enumeration['code']))
variableDictionary['enumerations'] = ','.join(enumerations)
else:
variableDictionary['enumerations'] = None

if 'min' in variable:
variableDictionary['min'] = variable['min']
else:
variableDictionary['min'] = None

if 'max' in variable:
variableDictionary['max'] = variable['max']
else:
variableDictionary['max'] = None

metadataList.append(variableDictionary)

if 'groups' in groupMetadata:
for group in groupMetadata['groups']:
metadataList = addGroupVariablesToList(group,
metadataList)
metadataList)
return metadataList


Expand Down Expand Up @@ -123,40 +130,43 @@ def addMetadataInTheDatabase(CDEsMetadataPath, cur):
insertVariableQuery += "'" + variable['code'] + "'"
insertVariableQuery += ", '" + variable['label'] + "'"
insertVariableQuery += ", '" + variable['sql_type'] + "'"
insertVariableQuery += ", " + variable['isCategorical']
if variable['enumerations'] :
insertVariableQuery += ", " + variable['isCategorical']
if variable['enumerations']:
insertVariableQuery += ", '" + variable['enumerations'] + "'"
else:
insertVariableQuery += ", NULL"
if variable['min'] :

if variable['min']:
insertVariableQuery += ", '" + variable['min'] + "'"
else:
insertVariableQuery += ", NULL"
if variable['max'] :

if variable['max']:
insertVariableQuery += ", '" + variable['max'] + "'"
else:
insertVariableQuery += ", NULL"
insertVariableQuery += ", NULL"

insertVariableQuery += ");"

try:
cur.execute(insertVariableQuery)
except sqlite3.IntegrityError:
raise ValueError ('Failed to execute query: ' + insertVariableQuery + ' , due to database constraints.')
raise ValueError('Failed to execute query: ' + insertVariableQuery + ' , due to database constraints.')


def createDataTable(metadataDictionary, cur):
# Create the query for the sqlite data table
createDataTableQuery = 'CREATE TABLE DATA('
for column in metadataDictionary:
if metadataDictionary[column] in ['INT','int','Int']:
createDataTableQuery += column + ' ' + metadataDictionary[column] + ' CHECK (TYPEOF(' + column + ') = "integer" OR TYPEOF(' + column + ') = "null"), '
elif metadataDictionary[column] in ['REAL','real','Real']:
createDataTableQuery += column + ' ' + metadataDictionary[column] + ' CHECK (TYPEOF(' + column + ') = "real" OR TYPEOF(' + column + ') = "integer" OR TYPEOF(' + column + ') = "null"), '
elif metadataDictionary[column] in ['TEXT','text','Text']:
createDataTableQuery += column + ' ' + metadataDictionary[column] + ' CHECK (TYPEOF(' + column + ') = "text" OR TYPEOF(' + column + ') = "null"), '
if metadataDictionary[column] in ['INT', 'int', 'Int']:
createDataTableQuery += column + ' ' + metadataDictionary[
column] + ' CHECK (TYPEOF(' + column + ') = "integer" OR TYPEOF(' + column + ') = "null"), '
elif metadataDictionary[column] in ['REAL', 'real', 'Real']:
createDataTableQuery += column + ' ' + metadataDictionary[
column] + ' CHECK (TYPEOF(' + column + ') = "real" OR TYPEOF(' + column + ') = "integer" OR TYPEOF(' + column + ') = "null"), '
elif metadataDictionary[column] in ['TEXT', 'text', 'Text']:
createDataTableQuery += column + ' ' + metadataDictionary[
column] + ' CHECK (TYPEOF(' + column + ') = "text" OR TYPEOF(' + column + ') = "null"), '
# Remove the last comma
createDataTableQuery = createDataTableQuery[:-2]
createDataTableQuery += ')'
Expand All @@ -167,42 +177,56 @@ def createDataTable(metadataDictionary, cur):


def addCSVInTheDataTable(csvFilePath, metadataDictionary, cur):

# Open the csv
csvFile = open(csvFilePath, 'r')
csvReader = csv.reader(csvFile)

# Create the csv INSERT statement
csvHeader = next(csvReader)
columnsString = csvHeader[0]
for column in csvHeader[1:]:
if column not in metadataDictionary:
raise KeyError('Column ' + column + ' does not exist in the metadata!')
columnsString += ', ' + column
columnsQuery = 'INSERT INTO DATA (' + columnsString + ') VALUES ('
columnsSectionOfSQLQuery = 'INSERT INTO DATA (' + columnsString + ') VALUES '


# Insert data
number_of_rows = 0
valuesSectionOfSQLQuery = '('
for row in csvReader:
insertRowQuery = columnsQuery
number_of_rows += 1
for (value, column) in zip(row, csvHeader):
if metadataDictionary[column] == 'text':
insertRowQuery += "'" + value + "', "
valuesSectionOfSQLQuery += "'" + value + "', "
elif value == '':
insertRowQuery += 'null, '
valuesSectionOfSQLQuery += 'null, '
else:
insertRowQuery += value + ", "
insertRowQuery = insertRowQuery[:-2]
insertRowQuery += ');'
valuesSectionOfSQLQuery += value + ", "
if (number_of_rows % int(MAX_ROWS_TO_INSERT_INTO_SQL) == 0 or next(csvReader, None) == None):
valuesSectionOfSQLQuery = valuesSectionOfSQLQuery[:-2]
valuesSectionOfSQLQuery += ');'

try:
cur.execute(columnsSectionOfSQLQuery + valuesSectionOfSQLQuery)
except:
findErrorOnBulkInsertQuery(cur, valuesSectionOfSQLQuery, csvHeader, metadataDictionary, csvFilePath)
valuesSectionOfSQLQuery = '('
else:
valuesSectionOfSQLQuery = valuesSectionOfSQLQuery[:-2]
valuesSectionOfSQLQuery += '),('

try:
cur.execute(insertRowQuery)
except:
findErrorOnSqlQuery(cur, row, csvHeader, metadataDictionary, csvFilePath)
raise ValueError('Row: ' + str(row) + ', Query: ' + str(insertRowQuery) + ', could not be inserted in the database.')

def findErrorOnBulkInsertQuery(cur, valuesOfQuery, csvHeader, metadataDictionary, csvFilePath):
# Removing the first and last parenthesis
valuesOfQuery = valuesOfQuery[1:-2]
# Removing the ' from character values
valuesOfQuery = valuesOfQuery.replace("\'", "")
# Call findErrorOnSqlQuery for each row in the bulk query
for row in valuesOfQuery.split('),('):
findErrorOnSqlQuery(cur, row.split(','), csvHeader, metadataDictionary, csvFilePath)

def findErrorOnSqlQuery(cur, row, csvHeader, metadataDictionary, csvFilePath):

# Insert the code column into the database and then update it for each row to find where the problem is
firstRow = True

Expand All @@ -213,7 +237,7 @@ def findErrorOnSqlQuery(cur, row, csvHeader, metadataDictionary, csvFilePath):
insertQuery = "INSERT INTO DATA (subjectcode) VALUES ('" + value + "');"
cur.execute(insertQuery)
continue;

if metadataDictionary[column] == 'text':
updateQuery = "UPDATE DATA SET " + column + " = '" + value + "' WHERE subjectcode = '" + code + "';";
elif value == '':
Expand All @@ -224,12 +248,11 @@ def findErrorOnSqlQuery(cur, row, csvHeader, metadataDictionary, csvFilePath):
try:
cur.execute(updateQuery)
except:
raise ValueError("Error inserting into the database. Could not insert value: '" + value + "', into column: '" + column + "', at row with subjectcode: " + code + ", while inserting csv: " + csvFilePath)

raise ValueError(
"Error inserting into the database. Could not insert value: '" + value + "', into column: '" + column + "', at row with subjectcode: " + code + ", while inserting csv: " + csvFilePath)


def main():

# Read the parameters
parser = ArgumentParser()
parser.add_argument('-f', '--pathologiesFolderPath', required=True,
Expand All @@ -238,42 +261,40 @@ def main():
help='Is this a master or a worker node?'
)
args = parser.parse_args()

pathologiesFolderPath = os.path.abspath(args.pathologiesFolderPath)

# Get all pathologies
pathologiesList = next(os.walk(pathologiesFolderPath))[1]

# Create the datasets db for each pathology
for pathologyName in pathologiesList:

# Initializing metadata and output absolute path
CDEsMetadataPath = os.path.join(pathologiesFolderPath,pathologyName,"CDEsMetadata.json")
outputDBAbsPath = os.path.join(pathologiesFolderPath,pathologyName,"datasets.db")
CDEsMetadataPath = os.path.join(pathologiesFolderPath, pathologyName, "CDEsMetadata.json")
outputDBAbsPath = os.path.join(pathologiesFolderPath, pathologyName, "datasets.db")

# Connect to the database
con = sqlite3.connect(outputDBAbsPath)
cur = con.cursor()

# Add the metadata table + rows
addMetadataInTheDatabase(CDEsMetadataPath, cur)

# Transform the metadata json into a column name -> column type list
metadataDictionary = createMetadataDictionary(CDEsMetadataPath)

# Create the data table with the header
createDataTable(metadataDictionary, cur)

# Add all the csvs in the database
for csv in os.listdir(os.path.join(pathologiesFolderPath,pathologyName)):
for csv in os.listdir(os.path.join(pathologiesFolderPath, pathologyName)):
if csv.endswith('.csv'):
csvFilePath = os.path.join(pathologiesFolderPath,pathologyName,csv)
csvFilePath = os.path.join(pathologiesFolderPath, pathologyName, csv)
addCSVInTheDataTable(csvFilePath, metadataDictionary, cur)



con.commit()
con.close()


if __name__ == '__main__':
main()
main()

0 comments on commit e47ce16

Please sign in to comment.