Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send data to database using plain SQL and psycopg2 #7

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
Icon?
__pycache__
*.csv
desktop.ini
desktop.ini
.venv/
.vscode/
52 changes: 31 additions & 21 deletions column_casting.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import sqlalchemy


# Default database datatype for all non-derived data is text
# This file specifies columns to cast and what the python and database datatypes should be is before importing to database

Expand All @@ -11,22 +8,28 @@
# But that first step is currently disabled, thought to be unnecesary, so conversion goes:
# str -> db_dataype in dfs_to_db()
# DateTime, boolean, and string data values are handled correctly automatically
# Conversion to db_dataype disregards the sentence_type, so if column db_datatypes are different, column names must be uniquw
# Conversion to db_dataype disregards the sentence_type, so if column db_datatypes are different, column names must be unique

# Database datatypes reference: https://www.tutorialspoint.com/postgresql/postgresql_data_types.htm


datatype_dict = {} # Must contain (key, value) pair for all destination datatypes
datatype_dict['Int16'] = sqlalchemy.types.SmallInteger()
datatype_dict['Int32'] = sqlalchemy.types.Integer()
datatype_dict['float32'] = sqlalchemy.types.Float(precision=6)
datatype_dict['text'] = sqlalchemy.types.Text()

# Must contain (key, value) pair for all destination datatypes
datatype_dict = {}
datatype_dict['Int16'] = 'smallint'
datatype_dict['Int32'] = 'integer'
datatype_dict['float32'] = 'decimal'
datatype_dict['text'] = 'varchar(8)'

# This db_datatypes dictionary is completed in dfs_to_db()
db_datatypes = {}
db_datatypes['cycle_id'] = sqlalchemy.types.Integer()

db_datatypes['unique_id'] = 'varchar(32)'
db_datatypes['cycle_id'] = 'integer'
db_datatypes['talker'] = 'varchar(2)'
db_datatypes['sentence_type'] = 'varchar(4)'
db_datatypes['datetime'] = 'timestamp'
db_datatypes['datetime_is_interpolated'] = 'boolean'
db_datatypes['sentence_is_merged_from_multiple'] = 'boolean'
db_datatypes['latitude'] = 'decimal'
db_datatypes['longitude'] = 'decimal'

columns_to_cast = {}

Expand All @@ -46,19 +49,22 @@
'sv_prn_num_13', 'elevation_deg_13', 'azimuth_13', 'snr_13',
'sv_prn_num_14', 'elevation_deg_14', 'azimuth_14', 'snr_14',
'sv_prn_num_15', 'elevation_deg_15', 'azimuth_15', 'snr_15',
'sv_prn_num_16', 'elevation_deg_16', 'azimuth_16', 'snr_16',]
'sv_prn_num_16', 'elevation_deg_16', 'azimuth_16', 'snr_16']

columns_to_cast['RMC', 'Int32'] = ['datestamp']
columns_to_cast['RMC', 'float32'] = ['timestamp', 'lat', 'lon', 'spd_over_grnd', 'true_course', 'mag_variation']
columns_to_cast['RMC', 'text'] = ['status', 'lat_dir', 'lon_dir', 'mode', 'mag_var_dir']
columns_to_cast['RMC', 'text'] = ['status', 'lat_dir', 'lon_dir', 'mode', 'mag_var_dir', 'nav_status', 'mode_indicator']
# For RMC, added 'nav_status' and 'mode_indicator'

columns_to_cast['GGA', 'float32'] = ['timestamp', 'lat', 'lon', 'horizontal_dil', 'altitude', 'geo_sep']
columns_to_cast['GGA', 'float32'] = ['timestamp', 'lat', 'lon', 'horizontal_dil', 'altitude', 'geo_sep', 'age_gps_data', 'ref_station_id']
columns_to_cast['GGA', 'Int16'] = ['gps_qual', 'num_sats']
columns_to_cast['GGA', 'text'] = ['lat_dir', 'altitude_units', 'geo_sep_units']
# TODO: For GGA, unsure about 'age_gps_data' and 'ref_station_id'
columns_to_cast['GGA', 'text'] = ['lat_dir', 'lon_dir', 'altitude_units', 'geo_sep_units']
# For GGA, unsure about 'age_gps_data' and 'ref_station_id', can be 'Int32' or 'float32'
# For GGA, added 'lon_dir'

columns_to_cast['GLL', 'float32'] = ['lat', 'lon']
columns_to_cast['GLL', 'float32'] = ['timestamp', 'lat', 'lon']
columns_to_cast['GLL', 'text'] = ['lat_dir', 'lon_dir', 'status', 'faa_mode']
# For GLL, added 'timestamp'

columns_to_cast['VTG', 'float32'] = ['true_track', 'mag_track', 'spd_over_grnd_kts', 'spd_over_grnd_kmph']
columns_to_cast['VTG', 'text'] = ['true_track_sym', 'mag_track_sym', 'spd_over_grnd_kts_sym', 'spd_over_grnd_kmph_sym', 'faa_mode']
Expand All @@ -68,8 +74,12 @@
'gp_sv_id09', 'gp_sv_id10', 'gp_sv_id11', 'gp_sv_id12',
'gl_sv_id01', 'gl_sv_id02', 'gl_sv_id03', 'gl_sv_id04',
'gl_sv_id05', 'gl_sv_id06', 'gl_sv_id07', 'gl_sv_id08',
'gl_sv_id09', 'gl_sv_id10', 'gl_sv_id11', 'gl_sv_id12',]
'gl_sv_id09', 'gl_sv_id10', 'gl_sv_id11', 'gl_sv_id12',
'ga_sv_id01', 'ga_sv_id02', 'ga_sv_id03', 'ga_sv_id04',
'ga_sv_id05', 'ga_sv_id06', 'ga_sv_id07', 'ga_sv_id08',
'ga_sv_id09', 'ga_sv_id10', 'ga_sv_id11', 'ga_sv_id12']
columns_to_cast['GSA', 'float32'] = ['pdop', 'hdop', 'vdop']
columns_to_cast['GSA', 'text'] = ['mode']
# For GSA, added 'ga_sv_id01' .. 'ga_sv_id12'

columns_to_cast['GST', 'float32'] = ['rms', 'std_dev_latitude', 'std_dev_longitude', 'std_dev_altitude']
columns_to_cast['GST', 'float32'] = ['rms', 'std_dev_latitude', 'std_dev_longitude', 'std_dev_altitude']
4 changes: 2 additions & 2 deletions db_creds.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "nmea_data"
DB_NAME = "nmea_data"
123 changes: 101 additions & 22 deletions db_data_import.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,120 @@
IF_EXISTS_OPT = 'append' # 'fail', 'replace', or 'append', see https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html


import os
import sys
import sqlalchemy #import create_engine
import psycopg2

# Local modules/libary files:
import db_creds

import pandas as pd
import psycopg2

def send_data_to_db(log_file_path, dfs, table_name_base, table_name_suffixes=None, dtypes=None):
from column_casting import db_datatypes
from db_utils import create_table, run_db_command

log_file_name = os.path.basename(log_file_path)
# # 'fail', 'replace', or 'append', see https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html
# IF_EXISTS_OPT = 'append'

db_access_str = f'postgresql://{db_creds.DB_USER}:{db_creds.DB_PASSWORD}@{db_creds.DB_HOST}:{db_creds.DB_PORT}/{db_creds.DB_NAME}'
engine = sqlalchemy.create_engine(db_access_str)

# uses psycopg2.connection.cursor.execute()
def send_data_to_db(dfs: list[pd.DataFrame], table_name_base: str, table_name_suffixes=None, dtypes=None):
table_names = []

# Put data in database
for df_idx, df in enumerate(dfs):

if_exists_opt_loc = IF_EXISTS_OPT

table_name = table_name_base
if table_name_suffixes:
table_name = table_name + '_' + table_name_suffixes[df_idx]

# Create column datatypes collection
columns: list[dict] = []
for name, datatype in db_datatypes.items():
if name in df.columns:
columns.append({'name': name, 'datatype': datatype})

# Create table in database for talker type in current dataframe
try:
create_table(table_name, columns)
except psycopg2.OperationalError as ex:
# Print error text bold and red
sys.exit(f"\n\n\033[1m\033[91mERROR creating database tables:\n {ex}\033[0m\n\nExiting.\n\n")

# Contains all values that must be inserted into the placeholders in the SQL command
values = []

# Create an SQL INSERT command, but use placeholders for inputs.
# Data gets inserted into the query by the execute() function.
db_command = f"INSERT INTO \"{table_name}\" ("

# Keys placeholders
for _, col in enumerate(df.columns):
# Add placeholder in string for key
db_command = db_command + str(col) + ", "
# Don't append a comma after the last key
if db_command[-2:] == ", ":
db_command = db_command[:-2]

db_command = db_command + ") VALUES "

# Value placeholders for one row
# (%s, %s, ..., %s)
placeholders_row = '('
for _ in range(df.shape[1]):
# Add placeholder in string for row value
placeholders_row = placeholders_row + "%s, "
# Don't append a comma after the last value of a row
if placeholders_row[-2:] == ", ":
placeholders_row = placeholders_row[:-2]
placeholders_row = placeholders_row + ')'

# Value placeholders for all the rows
# (%s, %s, ..., %s), (%s, %s, ..., %s), ..., (%s, %s, ..., %s)
for idx in range(df.shape[0]):
# Add placeholder in string for whole row
db_command = db_command + placeholders_row + ", "

# Add all values of a row to values list
values.extend(df.values[idx])
# Don't append a comma after the last row
if db_command[-2:] == ", ":
db_command = db_command[:-2]

db_command = db_command + ';'

# Replace all types that are not compatible with psycopg2 with None
values = [None if (type(v) == type(pd.NaT) or str(v) == "nan") else v for v in values] # noqa: E721

# Write current dataframe to database table for talker of this dataframe
try:
df.to_sql(table_name, engine, method='multi', if_exists=if_exists_opt_loc, index=False, dtype=dtypes)
except (sqlalchemy.exc.OperationalError, psycopg2.OperationalError) as e:
sys.exit(f"\n\n\033[1m\033[91mERROR writing to database:\n {e}\033[0m\n\nExiting.\n\n") # Print error text bold and red
run_db_command(db_command, tuple(values))
except psycopg2.OperationalError as ex:
# Print error text bold and red
sys.exit(f"\n\n\033[1m\033[91mERROR writing to database:\n {ex}\033[0m\n\nExiting.\n\n")

table_names.append(table_name)

return table_names

# TODO: Create separate table for log file IDs and names. Check what the current larged ID is, then append a column to
# the dfs with that ID + 1, and a row to the log file table with that ID and the log file name, or something like that

# # uses pandas.DataFrame.to_sql()
# def _send_data_to_db(dfs: list[pd.DataFrame], table_name_base: str, table_name_suffixes=None, dtypes=None):
# db_access_str = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
# engine = sqlalchemy.create_engine(db_access_str)
#
# table_names = []
#
# # Put data in database
# for df_idx, df in enumerate(dfs):
#
# if_exists_opt_loc = IF_EXISTS_OPT
#
# table_name = table_name_base
# if table_name_suffixes:
# table_name = table_name + '_' + table_name_suffixes[df_idx]
#
# try:
# df.to_sql(table_name, engine, method='multi', if_exists=if_exists_opt_loc, index=False, dtype=dtypes)
# except (sqlalchemy.exc.OperationalError, psycopg2.OperationalError) as ex:
# # Print error text bold and red
# sys.exit(f"\n\n\033[1m\033[91mERROR writing to database:\n {ex}\033[0m\n\nExiting.\n\n")
#
# table_names.append(table_name)
#
# return table_names

# TODO: Create separate table for log file IDs and names. Check what the current largest ID is, then append a column to
# the dfs with that ID + 1, and a row to the log file table with that ID and the log file name, or something like that
22 changes: 11 additions & 11 deletions db_table_lists.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
nmea_tables = [
'nmea_gl_gsv',
'nmea_gn_gga',
'nmea_gn_gll',
'nmea_gn_gns',
'nmea_gn_gsa',
'nmea_gn_gst',
'nmea_gn_rmc',
'nmea_gn_vtg',
'nmea_gp_gsv',
]
NMEA_TABLES = [
'nmea_gl_gsv',
'nmea_gn_gga',
'nmea_gn_gll',
'nmea_gn_gns',
'nmea_gn_gsa',
'nmea_gn_gst',
'nmea_gn_rmc',
'nmea_gn_vtg',
'nmea_gp_gsv',
]
Loading