Skip to content

Commit

Permalink
chore: bumped python to 3.9; small refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas ESTRADA committed Jan 22, 2025
1 parent c9c5bcb commit d695afb
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 96 deletions.
67 changes: 2 additions & 65 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ readme = "README.md"
packages = [{include = "sources"}]

[tool.poetry.dependencies]
python = ">=3.8.1,<3.13"
python = ">=3.9,<3.13"
dlt = {version = "1.3.0", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb"]}
graphlib-backport = {version = "*", python = "<3.9"}

[tool.poetry.group.dltpure.dependencies]
dlt = {version = "1.3.0", allow-prereleases = true}
Expand Down
2 changes: 1 addition & 1 deletion sources/pg_legacy_replication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def replication_source(
def replication_resource(slot_name: str) -> Iterable[TDataItem]:
# start where we left off in previous run
start_lsn = dlt.current.resource_state().get("last_commit_lsn", 0)
if flush_slot:
if flush_slot and start_lsn > 0:
advance_slot(start_lsn, slot_name, credentials)

# continue until last message in replication slot
Expand Down
56 changes: 28 additions & 28 deletions sources/pg_legacy_replication/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,17 @@ def get_max_lsn(credentials: ConnectionStringCredentials) -> Optional[int]:
Returns None if the replication slot is empty.
Does not consume the slot, i.e. messages are not flushed.
"""
cur = _get_conn(credentials).cursor()
loc_fn = (
"pg_current_xlog_location"
if get_pg_version(cur) < 100000
else "pg_current_wal_lsn"
)
# subtract '0/0' to convert pg_lsn type to int (https://stackoverflow.com/a/73738472)
cur.execute(f"SELECT {loc_fn}() - '0/0' as max_lsn;")
lsn: int = cur.fetchone()[0]
cur.connection.close()
return lsn
with _get_conn(credentials) as conn:
cur = conn.cursor()
loc_fn = (
"pg_current_xlog_location"
if get_pg_version(cur) < 100000
else "pg_current_wal_lsn"
)
# subtract '0/0' to convert pg_lsn type to int (https://stackoverflow.com/a/73738472)
cur.execute(f"SELECT {loc_fn}() - '0/0' as max_lsn;")
lsn: int = cur.fetchone()[0]
return lsn


def lsn_int_to_hex(lsn: int) -> str:
Expand All @@ -262,13 +262,14 @@ def advance_slot(
This function is used as alternative to psycopg2's `send_feedback` method, because
the behavior of that method seems odd when used outside of `consume_stream`.
"""
if upto_lsn != 0:
cur = _get_conn(credentials).cursor()
assert upto_lsn > 0
with _get_conn(credentials) as conn:
cur = conn.cursor()
# There is unfortunately no way in pg9.6 to manually advance the replication slot
if get_pg_version(cur) > 100000:
cur.execute(
f"SELECT * FROM pg_replication_slot_advance('{slot_name}', '{lsn_int_to_hex(upto_lsn)}');"
)
cur.connection.close()


def _get_conn(
Expand Down Expand Up @@ -452,20 +453,19 @@ def __iter__(self) -> Iterator[TableItems]:
Maintains LSN of last consumed commit message in object state.
Advances the slot only when all messages have been consumed.
"""
cur = _get_rep_conn(self.credentials).cursor()
cur.start_replication(slot_name=self.slot_name, start_lsn=self.start_lsn)
consumer = MessageConsumer(
upto_lsn=self.upto_lsn,
table_qnames=self.table_qnames,
repl_options=self.repl_options,
target_batch_size=self.target_batch_size,
)
try:
cur.consume_stream(consumer)
except StopReplication: # completed batch or reached `upto_lsn`
yield from self.flush_batch(cur, consumer)
finally:
cur.connection.close()
with _get_rep_conn(self.credentials) as conn:
cur = conn.cursor()
cur.start_replication(slot_name=self.slot_name, start_lsn=self.start_lsn)
consumer = MessageConsumer(
upto_lsn=self.upto_lsn,
table_qnames=self.table_qnames,
repl_options=self.repl_options,
target_batch_size=self.target_batch_size,
)
try:
cur.consume_stream(consumer)
except StopReplication: # completed batch or reached `upto_lsn`
yield from self.flush_batch(cur, consumer)

def flush_batch(
self, cur: ReplicationCursor, consumer: MessageConsumer
Expand Down

0 comments on commit d695afb

Please sign in to comment.