Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: postgres integration #959

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 106 additions & 28 deletions sotodlib/core/metadata/common.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,96 @@
import sqlite3
import psycopg
import gzip
import os

GET_TABLE_CREATE = """with table_info as (
select
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How was the schema being created before?

Wouldn't it be best to have this as part of a migration script, through alembic or similar, if we're not using an ORM?

Copy link
Member Author

@iparask iparask Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to dump an existing DB to a file, similar to SQLite to file function. I do not know how the schema is created. @mhasself can you provide some more information here?

c.column_name,
c.data_type,
c.character_maximum_length,
c.is_nullable,
tc.constraint_type,
tc.constraint_name
from
information_schema.columns as c
left join (
select
kcu.column_name,
tc.constraint_type,
tc.constraint_name,
kcu.table_name
from
information_schema.table_constraints as tc
join
information_schema.key_column_usage as kcu
on tc.constraint_name = kcu.constraint_name
where
tc.table_name = '%s'
) as tc
on c.column_name = tc.column_name
where
c.table_name = '%s'
)
select
'create table your_table_name (' || string_agg(
column_name || ' ' ||
data_type ||
case
when character_maximum_length is not null
then '(' || character_maximum_length || ')'
else ''
end ||
case
when is_nullable = 'no' then ' not null'
else ''
end ||
case
when constraint_type is not null then
' constraint ' || constraint_name || ' ' || constraint_type
else ''
end,
', '
) || ');'
from
table_info;
"""

def sqlite_to_file(db, filename, overwrite=True, fmt=None):

def dump_database(conn: psycopg.Connection) -> str:
with conn.cursor() as cur:
db_dump = ""
# Fetch all table names
cur.execute(
"select table_name from information_schema.tables where table_schema='public'"
)
tables = cur.fetchall()

for (table_name,) in tables:
# Dump CREATE TABLE statement
cur.execute(GET_TABLE_CREATE % (table_name, table_name))
create_table = cur.fetchone()[0] + "\n"
db_dump += create_table
columns = cur.execute(
"select column_name, data_type, character_maximum_length from "
+ "information_schema.columns where table_name = '%s';" % table_name
).fetchall()
column_names = ", ".join(f"{col[0]}" for col in columns)
# Dump data
cur.execute(f"select {column_names} from {table_name}")
rows = cur.fetchall()
for row in rows:
values = ", ".join(
"NULL" if value is None else f"'{str(value)}'" for value in row
)
db_dump += (
f"insert into {table_name} ({column_names}) values ({values});\n"
)

return db_dump


def postgres_to_file(
db: psycopg.Connection, filename: str, overwrite: bool = True, fmt: str = None
) -> None:
"""Write an sqlite db to file. Supports several output formats.

Args:
Expand All @@ -19,60 +106,51 @@ def sqlite_to_file(db, filename, overwrite=True, fmt=None):
if filename.endswith('.gz'):
fmt = 'gz'
else:
fmt = 'sqlite'
fmt = "dump"
if os.path.exists(filename) and not overwrite:
raise RuntimeError(f'File {filename} exists; remove or pass '
'overwrite=True.')
if fmt == 'sqlite':
if os.path.exists(filename):
os.remove(filename)
new_db = sqlite3.connect(filename)
script = ' '.join(db.iterdump())
new_db.executescript(script)
new_db.commit()
elif fmt == 'dump':
if fmt == "dump":
with open(filename, 'w') as fout:
for line in db.iterdump():
for line in dump_database(db):
fout.write(line)
elif fmt == 'gz':
with gzip.GzipFile(filename, 'wb') as fout:
for line in db.iterdump():
fout.write(line.encode('utf-8'))
for line in dump_database(db):
fout.write(line)
else:
raise RuntimeError(f'Unknown format "{fmt}" requested.')

def sqlite_from_file(filename, fmt=None, force_new_db=True):

def postgres_from_file(filename: str, db: psycopg.Connection, fmt: str = None) -> None:
"""Instantiate an sqlite3.Connection and return it, with the data
copied in from the specified file. The function can either map the database
file directly, or map a copy of the database in memory (see force_new_db
parameter).

Args:
filename (str): path to the file.
db: A new DB connection.
fmt (str): format of the input; see to_file for details.
force_new_db (bool): Used if connecting to an sqlite database. If True the
databas is copied into memory and if False returns a connection to the
database is copied into memory and if False returns a connection to the
database without reading it into memory

"""
if fmt is None:
fmt = 'sqlite'
fmt = "dump"
if filename.endswith('.gz'):
fmt = 'gz'
if fmt == 'sqlite':
db0 = sqlite3.connect(f'file:{filename}?mode=ro', uri=True)
if not force_new_db:
return db0
data = ' '.join(db0.iterdump())
elif fmt == 'dump':
if fmt == "dump":
with open(filename, 'r') as fin:
data = fin.read()
data = fin.readlines()
elif fmt == 'gz':
with gzip.GzipFile(filename, 'r') as fin:
data = fin.read().decode('utf-8')
data = fin.readlines().decode("utf-8")
else:
raise RuntimeError(f'Unknown format "{fmt}" requested.')
db = sqlite3.connect(':memory:')
db.executescript(data)
return db

with db.cursor() as cursor:
for datum in data:
cursor.execute(datum.strip())
db.commit()
Loading
Loading