From c13d46634d8febb2b46667743c38812d65fdc41b Mon Sep 17 00:00:00 2001 From: Katherine Bargar Date: Mon, 15 Sep 2025 20:21:14 +0000 Subject: [PATCH 1/4] For postgres, switch to using copy to insert data --- src/ldlite/_database.py | 9 ++++++--- src/ldlite/_sqlx.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/src/ldlite/_database.py b/src/ldlite/_database.py index 0024e4f..564b3b2 100644 --- a/src/ldlite/_database.py +++ b/src/ldlite/_database.py @@ -178,13 +178,16 @@ def ingest_records( with closing(self._conn_factory()) as conn: self._prepare_raw_table(conn, prefix) with closing(conn.cursor()) as cur: - for pkey, d in records: + is_str = None + for pkey, r in records: + if is_str is None: + is_str = isinstance(r, str) cur.execute( self._insert_record_sql.format( table=prefix.raw_table_name, ).as_string(), - [pkey, d if isinstance(d, str) else d.decode("utf-8")], + [pkey, r if is_str else cast("bytes", r).decode()], ) if not on_processed(): - return + break conn.commit() diff --git a/src/ldlite/_sqlx.py b/src/ldlite/_sqlx.py index e87a38f..8e52aeb 100644 --- a/src/ldlite/_sqlx.py +++ b/src/ldlite/_sqlx.py @@ -2,6 +2,7 @@ import secrets import sqlite3 +from contextlib import closing from enum import Enum from typing import TYPE_CHECKING, Callable, cast @@ -12,8 +13,11 @@ from ._database import Database if TYPE_CHECKING: + from collections.abc import Iterator + from _typeshed import dbapi + from ._database import Prefix from ._jsonx import JsonValue @@ -69,6 +73,41 @@ def _insert_record_sql(self) -> sql.SQL: return sql.SQL(insert_sql) + def ingest_records( + self, + prefix: Prefix, + on_processed: Callable[[], bool], + records: Iterator[tuple[int, str | bytes]], + ) -> None: + if self._dbtype != DBType.POSTGRES: + super().ingest_records(prefix, on_processed, records) + return + + with closing(self._conn_factory()) as conn: + self._prepare_raw_table(conn, prefix) + if pgconn := as_postgres(conn, self._dbtype): + with ( + pgconn.cursor() as cur, + cur.copy( + sql.SQL("COPY {table} (__id, jsonb) FROM STDIN").format( + table=prefix.raw_table_name, + ), + ) as copy, + ): + is_str = None + for pkey, r in records: + if is_str is None: + is_str = isinstance(r, str) + copy.write_row( + ( + pkey, + r if is_str else cast("bytes", r).decode(), + ), + ) + if not on_processed(): + break + pgconn.commit() + def as_duckdb( db: dbapi.DBAPIConnection, From 25bc039b019e9cb658930516cf517bbcc36b83e5 Mon Sep 17 00:00:00 2001 From: Katherine Bargar Date: Mon, 15 Sep 2025 20:34:02 +0000 Subject: [PATCH 2/4] Ingest jsonb directly for postgres --- src/ldlite/__init__.py | 6 ++++-- src/ldlite/_database.py | 36 +++++++++++++++++++++------------ src/ldlite/_folio.py | 6 +++--- src/ldlite/_sqlx.py | 45 ++++++++++++++++++++++++++--------------- 4 files changed, 59 insertions(+), 34 deletions(-) diff --git a/src/ldlite/__init__.py b/src/ldlite/__init__.py index 063bea9..59381f4 100644 --- a/src/ldlite/__init__.py +++ b/src/ldlite/__init__.py @@ -62,6 +62,8 @@ from ._xlsx import to_xlsx if TYPE_CHECKING: + from collections.abc import Iterator + from _typeshed import dbapi from httpx_folio.query import QueryType @@ -362,7 +364,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 self.page_size, query=cast("QueryType", query), ) - (total_records, _) = next(records) + total_records = cast("int", next(records)) total = min(total_records, limit or total_records) if self._verbose: print("ldlite: estimated row count: " + str(total), file=sys.stderr) @@ -403,7 +405,7 @@ def on_processed_limit() -> bool: self._db.ingest_records( prefix, on_processed_limit if limit is not None else on_processed, - records, + cast("Iterator[tuple[bytes, bytes] | tuple[int, str]]", records), ) pbar.close() diff --git a/src/ldlite/_database.py b/src/ldlite/_database.py index 564b3b2..a18a0bf 100644 --- a/src/ldlite/_database.py +++ b/src/ldlite/_database.py @@ -173,21 +173,31 @@ def ingest_records( self, prefix: Prefix, on_processed: Callable[[], bool], - records: Iterator[tuple[int, str | bytes]], + records: Iterator[tuple[bytes, bytes] | tuple[int, str]], ) -> None: with closing(self._conn_factory()) as conn: self._prepare_raw_table(conn, prefix) + insert_sql = self._insert_record_sql.format( + table=prefix.raw_table_name, + ).as_string() with closing(conn.cursor()) as cur: - is_str = None - for pkey, r in records: - if is_str is None: - is_str = isinstance(r, str) - cur.execute( - self._insert_record_sql.format( - table=prefix.raw_table_name, - ).as_string(), - [pkey, r if is_str else cast("bytes", r).decode()], - ) - if not on_processed(): - break + fr = next(records) + if isinstance(fr[0], bytes): + record = fr + while record is not None: + (pkey, rb) = record + cur.execute( + insert_sql, + (int.from_bytes(pkey, "big"), rb.decode()), + ) + if not on_processed(): + break + record = cast("tuple[bytes, bytes]", next(records, None)) + else: + cur.execute(insert_sql, fr) + for r in records: + cur.execute(insert_sql, r) + if not on_processed(): + break + conn.commit() diff --git a/src/ldlite/_folio.py b/src/ldlite/_folio.py index cea5d7d..b6e4cd4 100644 --- a/src/ldlite/_folio.py +++ b/src/ldlite/_folio.py @@ -28,7 +28,7 @@ def iterate_records( retries: int, page_size: int, query: QueryType | None = None, - ) -> Iterator[tuple[int, str | bytes]]: + ) -> Iterator[int | tuple[bytes, bytes] | tuple[int, str]]: """Iterates all records for a given path. Returns: @@ -54,7 +54,7 @@ def iterate_records( res.raise_for_status() j = orjson.loads(res.text) r = int(j["totalRecords"]) - yield (r, b"") + yield r if r == 0: return @@ -103,7 +103,7 @@ def iterate_records( last = None for r in (o for o in orjson.loads(res.text)[key] if o is not None): last = r - yield (next(pkey), orjson.dumps(r)) + yield (next(pkey).to_bytes(4, "big"), orjson.dumps(r)) if last is None: return diff --git a/src/ldlite/_sqlx.py b/src/ldlite/_sqlx.py index 8e52aeb..2510a8e 100644 --- a/src/ldlite/_sqlx.py +++ b/src/ldlite/_sqlx.py @@ -77,7 +77,7 @@ def ingest_records( self, prefix: Prefix, on_processed: Callable[[], bool], - records: Iterator[tuple[int, str | bytes]], + records: Iterator[tuple[bytes, bytes] | tuple[int, str]], ) -> None: if self._dbtype != DBType.POSTGRES: super().ingest_records(prefix, on_processed, records) @@ -85,27 +85,40 @@ def ingest_records( with closing(self._conn_factory()) as conn: self._prepare_raw_table(conn, prefix) + + fr = next(records) + copy_from = "COPY {table} (__id, jsonb) FROM STDIN" + if is_bytes := isinstance(fr[0], bytes): + copy_from += " (FORMAT BINARY)" + if pgconn := as_postgres(conn, self._dbtype): with ( pgconn.cursor() as cur, cur.copy( - sql.SQL("COPY {table} (__id, jsonb) FROM STDIN").format( - table=prefix.raw_table_name, - ), + sql.SQL(copy_from).format(table=prefix.raw_table_name), ) as copy, ): - is_str = None - for pkey, r in records: - if is_str is None: - is_str = isinstance(r, str) - copy.write_row( - ( - pkey, - r if is_str else cast("bytes", r).decode(), - ), - ) - if not on_processed(): - break + if is_bytes: + # postgres jsonb is always version 1 + # and it always goes in front + jver = (1).to_bytes(1, "big") + record = fr + while record is not None: + pkey, rb = record + rpg = bytearray() + rpg.extend(jver) + rpg.extend(cast("bytes", rb)) + copy.write_row((pkey, rpg)) + if not on_processed(): + break + record = cast("tuple[bytes, bytes]", next(records, None)) + else: + copy.write_row(fr) + for r in records: + copy.write_row(r) + if not on_processed(): + break + pgconn.commit() From d9caa4700a117ed101343a50532ab838cfa1d2e7 Mon Sep 17 00:00:00 2001 From: Katherine Bargar Date: Tue, 16 Sep 2025 16:24:12 +0000 Subject: [PATCH 3/4] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42beae1..494909c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Please see [MIGRATING.md](./MIGRATING.md) for information on breaking changes. - psycopg3 is now used for internal operations. LDLite.connect_db_postgres will return a psycopg3 connection instead of psycopg2 in the next major release. - psycopg2 is now installed using the binary version. - Refactored internal database handling logic +- Ingesting data into postgres now uses COPY FROM which significantly improves the download performance. ### Removed From 1386777a0d27ecba59a3989ece6a6c41e67e66e6 Mon Sep 17 00:00:00 2001 From: Katherine Bargar Date: Tue, 16 Sep 2025 16:28:44 +0000 Subject: [PATCH 4/4] Rename variable --- src/ldlite/_sqlx.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ldlite/_sqlx.py b/src/ldlite/_sqlx.py index 2510a8e..18743d1 100644 --- a/src/ldlite/_sqlx.py +++ b/src/ldlite/_sqlx.py @@ -105,10 +105,10 @@ def ingest_records( record = fr while record is not None: pkey, rb = record - rpg = bytearray() - rpg.extend(jver) - rpg.extend(cast("bytes", rb)) - copy.write_row((pkey, rpg)) + rbpg = bytearray() + rbpg.extend(jver) + rbpg.extend(cast("bytes", rb)) + copy.write_row((pkey, rbpg)) if not on_processed(): break record = cast("tuple[bytes, bytes]", next(records, None))