@@ -133,19 +133,9 @@ def init_replication(
133
133
134
134
assert table_names is not None
135
135
136
- engine = _configure_engine (credentials , rep_conn )
137
-
138
- @sa .event .listens_for (engine , "begin" )
139
- def on_begin (conn : sa .Connection ) -> None :
140
- cur = conn .connection .cursor ()
141
- if slot is None :
142
- # Using the same isolation level that pg_backup uses
143
- cur .execute (
144
- "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE"
145
- )
146
- else :
147
- cur .execute ("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ" )
148
- cur .execute (f"SET TRANSACTION SNAPSHOT '{ slot ['snapshot_name' ]} '" )
136
+ engine = _configure_engine (
137
+ credentials , rep_conn , slot .get ("snapshot_name" ) if slot else None
138
+ )
149
139
150
140
table_names = [table_names ] if isinstance (table_names , str ) else table_names or []
151
141
@@ -155,16 +145,34 @@ def on_begin(conn: sa.Connection) -> None:
155
145
156
146
157
147
def _configure_engine (
158
- credentials : ConnectionStringCredentials , rep_conn : LogicalReplicationConnection
148
+ credentials : ConnectionStringCredentials ,
149
+ rep_conn : LogicalReplicationConnection ,
150
+ snapshot_name : Optional [str ],
159
151
) -> Engine :
160
152
"""
161
153
Configures the SQLAlchemy engine.
162
154
Also attaches the replication connection in order to prevent it being garbage collected and closed.
155
+
156
+ Args:
157
+ snapshot_name (str, optional): This is used during the initial first table snapshot allowing
158
+ all transactions to run with the same consistent snapshot.
163
159
"""
164
160
engine : Engine = engine_from_credentials (credentials )
165
161
engine .execution_options (stream_results = True , max_row_buffer = 2 * 50000 )
166
162
setattr (engine , "rep_conn" , rep_conn ) # noqa
167
163
164
+ @sa .event .listens_for (engine , "begin" )
165
+ def on_begin (conn : sa .Connection ) -> None :
166
+ cur = conn .connection .cursor ()
167
+ if snapshot_name is None :
168
+ # Using the same isolation level that pg_backup uses
169
+ cur .execute (
170
+ "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE"
171
+ )
172
+ else :
173
+ cur .execute ("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ" )
174
+ cur .execute (f"SET TRANSACTION SNAPSHOT '{ snapshot_name } '" )
175
+
168
176
@sa .event .listens_for (engine , "engine_disposed" )
169
177
def on_engine_disposed (e : Engine ) -> None :
170
178
delattr (e , "rep_conn" )
0 commit comments