From d695afb1e67ba60bf613cbd6b017c334ea00c5f7 Mon Sep 17 00:00:00 2001 From: Nicolas ESTRADA Date: Wed, 22 Jan 2025 17:14:00 +0100 Subject: [PATCH] chore: bumped python to 3.9; small refactorings --- poetry.lock | 67 +---------------------- pyproject.toml | 3 +- sources/pg_legacy_replication/__init__.py | 2 +- sources/pg_legacy_replication/helpers.py | 56 +++++++++---------- 4 files changed, 32 insertions(+), 96 deletions(-) diff --git a/poetry.lock b/poetry.lock index b10a9c72b..4096d4472 100644 --- a/poetry.lock +++ b/poetry.lock @@ -403,34 +403,6 @@ files = [ {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, ] -[[package]] -name = "backports-zoneinfo" -version = "0.2.1" -description = "Backport of the standard library zoneinfo module" -optional = false -python-versions = ">=3.6" -files = [ - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:da6013fd84a690242c310d77ddb8441a559e9cb3d3d59ebac9aca1a57b2e18bc"}, - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:89a48c0d158a3cc3f654da4c2de1ceba85263fafb861b98b59040a5086259722"}, - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:1c5742112073a563c81f786e77514969acb58649bcdf6cdf0b4ed31a348d4546"}, - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-win32.whl", hash = "sha256:e8236383a20872c0cdf5a62b554b27538db7fa1bbec52429d8d106effbaeca08"}, - {file = "backports.zoneinfo-0.2.1-cp36-cp36m-win_amd64.whl", hash = "sha256:8439c030a11780786a2002261569bdf362264f605dfa4d65090b64b05c9f79a7"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:f04e857b59d9d1ccc39ce2da1021d196e47234873820cbeaad210724b1ee28ac"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:17746bd546106fa389c51dbea67c8b7c8f0d14b5526a579ca6ccf5ed72c526cf"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:5c144945a7752ca544b4b78c8c41544cdfaf9786f25fe5ffb10e838e19a27570"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-win32.whl", hash = "sha256:e55b384612d93be96506932a786bbcde5a2db7a9e6a4bb4bffe8b733f5b9036b"}, - {file = "backports.zoneinfo-0.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:a76b38c52400b762e48131494ba26be363491ac4f9a04c1b7e92483d169f6582"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:8961c0f32cd0336fb8e8ead11a1f8cd99ec07145ec2931122faaac1c8f7fd987"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e81b76cace8eda1fca50e345242ba977f9be6ae3945af8d46326d776b4cf78d1"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:7b0a64cda4145548fed9efc10322770f929b944ce5cee6c0dfe0c87bf4c0c8c9"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-win32.whl", hash = "sha256:1b13e654a55cd45672cb54ed12148cd33628f672548f373963b0bff67b217328"}, - {file = "backports.zoneinfo-0.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:4a0f800587060bf8880f954dbef70de6c11bbe59c673c3d818921f042f9954a6"}, - {file = "backports.zoneinfo-0.2.1.tar.gz", hash = "sha256:fadbfe37f74051d024037f223b8e001611eac868b5c5b06144ef4d8b799862f2"}, -] - -[package.extras] -tzdata = ["tzdata"] - [[package]] name = "bandit" version = "1.7.5" @@ -730,7 +702,6 @@ files = [ clickhouse-connect = ">=0.5.7" duckdb = ">=0.7.1" fastapi = "0.85.1" -graphlib-backport = {version = ">=1.0.3", markers = "python_version < \"3.9\""} hnswlib = ">=0.7" numpy = ">=1.21.6" onnxruntime = ">=1.14.1" @@ -1186,7 +1157,6 @@ gcsfs = {version = ">=2022.4.0", optional = true, markers = "extra == \"gcp\" or gitpython = ">=3.1.29" giturlparse = ">=0.10.0" google-cloud-bigquery = {version = ">=2.26.0", optional = true, markers = "extra == \"gcp\" or extra == \"bigquery\""} -graphlib-backport = {version = "*", markers = "python_version < \"3.9\""} grpcio = {version = ">=1.50.0", optional = true, markers = "extra == \"gcp\" or extra == \"bigquery\""} hexbytes = ">=0.2.2" humanize = ">=4.4.0" @@ -1272,7 +1242,6 @@ files = [ ] [package.dependencies] -importlib-metadata = {version = ">=3.6.0", markers = "python_version < \"3.9\""} natsort = ">=7.0.1" typing-extensions = ">=3.7.4.1" @@ -2043,17 +2012,6 @@ protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.1 || >4.21.1,<4 [package.extras] grpc = ["grpcio (>=1.44.0,<2.0.0.dev0)"] -[[package]] -name = "graphlib-backport" -version = "1.0.3" -description = "Backport of the Python 3.9 graphlib module for Python 3.6+" -optional = false -python-versions = ">=3.6,<4.0" -files = [ - {file = "graphlib_backport-1.0.3-py3-none-any.whl", hash = "sha256:24246967b9e7e6a91550bc770e6169585d35aa32790258579a8a3899a8c18fde"}, - {file = "graphlib_backport-1.0.3.tar.gz", hash = "sha256:7bb8fc7757b8ae4e6d8000a26cd49e9232aaa9a3aa57edb478474b8424bfaae2"}, -] - [[package]] name = "greenlet" version = "2.0.2" @@ -2420,24 +2378,6 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker perf = ["ipython"] testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)", "pytest-ruff"] -[[package]] -name = "importlib-resources" -version = "6.4.0" -description = "Read resources from Python packages" -optional = false -python-versions = ">=3.8" -files = [ - {file = "importlib_resources-6.4.0-py3-none-any.whl", hash = "sha256:50d10f043df931902d4194ea07ec57960f66a80449ff867bfe782b4c486ba78c"}, - {file = "importlib_resources-6.4.0.tar.gz", hash = "sha256:cdb2b453b8046ca4e3798eb1d84f3cce1446a0e8e7b5ef4efb600f19fc398145"}, -] - -[package.dependencies] -zipp = {version = ">=3.1.0", markers = "python_version < \"3.10\""} - -[package.extras] -docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-lint"] -testing = ["jaraco.test (>=5.4)", "pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy", "pytest-ruff (>=0.2.1)", "zipp (>=3.17)"] - [[package]] name = "incremental" version = "22.10.0" @@ -3714,8 +3654,6 @@ files = [ ] [package.dependencies] -"backports.zoneinfo" = {version = ">=0.2.1", markers = "python_version < \"3.9\""} -importlib-resources = {version = ">=5.9.0", markers = "python_version < \"3.9\""} python-dateutil = ">=2.6" tzdata = ">=2020.1" @@ -5026,7 +4964,6 @@ files = [ [package.dependencies] markdown-it-py = ">=2.2.0" pygments = ">=2.13.0,<3.0.0" -typing-extensions = {version = ">=4.0.0,<5.0", markers = "python_version < \"3.9\""} [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] @@ -6600,5 +6537,5 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" -python-versions = ">=3.8.1,<3.13" -content-hash = "38baefccadc2b1ebc8c7b4f8702035dd2c60d88c5cd582b148a1cd01c52860ca" +python-versions = ">=3.9,<3.13" +content-hash = "6a657c817cec2ef5e110c455fd86ec73ce82e1e97dea77613ba4400238608594" diff --git a/pyproject.toml b/pyproject.toml index 75beddcc8..13beebf21 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,9 +11,8 @@ readme = "README.md" packages = [{include = "sources"}] [tool.poetry.dependencies] -python = ">=3.8.1,<3.13" +python = ">=3.9,<3.13" dlt = {version = "1.3.0", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb"]} -graphlib-backport = {version = "*", python = "<3.9"} [tool.poetry.group.dltpure.dependencies] dlt = {version = "1.3.0", allow-prereleases = true} diff --git a/sources/pg_legacy_replication/__init__.py b/sources/pg_legacy_replication/__init__.py index 8c8778ba3..d31ef8d25 100644 --- a/sources/pg_legacy_replication/__init__.py +++ b/sources/pg_legacy_replication/__init__.py @@ -85,7 +85,7 @@ def replication_source( def replication_resource(slot_name: str) -> Iterable[TDataItem]: # start where we left off in previous run start_lsn = dlt.current.resource_state().get("last_commit_lsn", 0) - if flush_slot: + if flush_slot and start_lsn > 0: advance_slot(start_lsn, slot_name, credentials) # continue until last message in replication slot diff --git a/sources/pg_legacy_replication/helpers.py b/sources/pg_legacy_replication/helpers.py index 76be86481..f9e11330a 100644 --- a/sources/pg_legacy_replication/helpers.py +++ b/sources/pg_legacy_replication/helpers.py @@ -231,17 +231,17 @@ def get_max_lsn(credentials: ConnectionStringCredentials) -> Optional[int]: Returns None if the replication slot is empty. Does not consume the slot, i.e. messages are not flushed. """ - cur = _get_conn(credentials).cursor() - loc_fn = ( - "pg_current_xlog_location" - if get_pg_version(cur) < 100000 - else "pg_current_wal_lsn" - ) - # subtract '0/0' to convert pg_lsn type to int (https://stackoverflow.com/a/73738472) - cur.execute(f"SELECT {loc_fn}() - '0/0' as max_lsn;") - lsn: int = cur.fetchone()[0] - cur.connection.close() - return lsn + with _get_conn(credentials) as conn: + cur = conn.cursor() + loc_fn = ( + "pg_current_xlog_location" + if get_pg_version(cur) < 100000 + else "pg_current_wal_lsn" + ) + # subtract '0/0' to convert pg_lsn type to int (https://stackoverflow.com/a/73738472) + cur.execute(f"SELECT {loc_fn}() - '0/0' as max_lsn;") + lsn: int = cur.fetchone()[0] + return lsn def lsn_int_to_hex(lsn: int) -> str: @@ -262,13 +262,14 @@ def advance_slot( This function is used as alternative to psycopg2's `send_feedback` method, because the behavior of that method seems odd when used outside of `consume_stream`. """ - if upto_lsn != 0: - cur = _get_conn(credentials).cursor() + assert upto_lsn > 0 + with _get_conn(credentials) as conn: + cur = conn.cursor() + # There is unfortunately no way in pg9.6 to manually advance the replication slot if get_pg_version(cur) > 100000: cur.execute( f"SELECT * FROM pg_replication_slot_advance('{slot_name}', '{lsn_int_to_hex(upto_lsn)}');" ) - cur.connection.close() def _get_conn( @@ -452,20 +453,19 @@ def __iter__(self) -> Iterator[TableItems]: Maintains LSN of last consumed commit message in object state. Advances the slot only when all messages have been consumed. """ - cur = _get_rep_conn(self.credentials).cursor() - cur.start_replication(slot_name=self.slot_name, start_lsn=self.start_lsn) - consumer = MessageConsumer( - upto_lsn=self.upto_lsn, - table_qnames=self.table_qnames, - repl_options=self.repl_options, - target_batch_size=self.target_batch_size, - ) - try: - cur.consume_stream(consumer) - except StopReplication: # completed batch or reached `upto_lsn` - yield from self.flush_batch(cur, consumer) - finally: - cur.connection.close() + with _get_rep_conn(self.credentials) as conn: + cur = conn.cursor() + cur.start_replication(slot_name=self.slot_name, start_lsn=self.start_lsn) + consumer = MessageConsumer( + upto_lsn=self.upto_lsn, + table_qnames=self.table_qnames, + repl_options=self.repl_options, + target_batch_size=self.target_batch_size, + ) + try: + cur.consume_stream(consumer) + except StopReplication: # completed batch or reached `upto_lsn` + yield from self.flush_batch(cur, consumer) def flush_batch( self, cur: ReplicationCursor, consumer: MessageConsumer