Skip to content

Commit 297945b

Browse files
committed
Improve metadata bwc test for Logical Replication
1 parent 8b0ebe2 commit 297945b

File tree

1 file changed

+52
-2
lines changed

1 file changed

+52
-2
lines changed

tests/bwc/test_rolling_upgrade.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from crate.client import connect
33
from crate.client.exceptions import ProgrammingError
44

5-
from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath
5+
from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy
66

77
ROLLING_UPGRADES_V4 = (
88
# 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug
@@ -41,7 +41,7 @@
4141
UpgradePath('5.8.x', '5.9.x'),
4242
UpgradePath('5.9.x', '5.10.x'),
4343
UpgradePath('5.10.x', '6.0.x'),
44-
UpgradePath('6.0.x', 'latest-nightly'),
44+
UpgradePath('6.0.x', 'branch:jeeminso/jeeminso/lr-broken-subscription'),
4545
)
4646

4747

@@ -88,6 +88,10 @@ def _test_rolling_upgrade(self, path, nodes):
8888
}
8989
cluster = self._new_cluster(path.from_version, nodes, settings=settings)
9090
cluster.start()
91+
replica_cluster = None
92+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
93+
replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False)
94+
replica_cluster.start()
9195
with connect(cluster.node().http_url, error_trace=True) as conn:
9296
c = conn.cursor()
9397
c.execute("create user arthur with (password = 'secret')")
@@ -152,6 +156,33 @@ def _test_rolling_upgrade(self, path, nodes):
152156
# Add the shards of the new partition primaries
153157
expected_active_shards += shards
154158

159+
# Set up tables for logical replications
160+
def num_docs_x(cursor):
161+
cursor.execute("select count(*) from doc.x")
162+
return cursor.fetchall()[0][0]
163+
164+
def num_docs_rx(cursor):
165+
cursor.execute("select count(*) from doc.rx")
166+
return cursor.fetchall()[0][0]
167+
168+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
169+
c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)")
170+
c.execute("create publication p for table doc.x")
171+
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
172+
rc = replica_conn.cursor()
173+
transport_port = cluster.node().addresses.transport.port
174+
replica_transport_port = replica_cluster.node().addresses.transport.port
175+
assert 4300 <= transport_port <= 4310 and 4300 <= replica_transport_port <= 4310
176+
rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)")
177+
rc.execute("create publication rp for table doc.rx")
178+
rc.execute(f"create subscription rs connection 'crate://localhost:{transport_port}?user=crate&sslmode=sniff' publication p")
179+
wait_for_active_shards(rc, 2) # doc.rx created via create-table and doc.x that is subscribed
180+
assert_busy(lambda: self.assertEqual(num_docs_x(rc), 0))
181+
c.execute(f"create subscription s connection 'crate://localhost:{replica_transport_port}?user=crate&sslmode=sniff' publication rp")
182+
expected_active_shards += 2
183+
wait_for_active_shards(c, expected_active_shards)
184+
assert_busy(lambda: self.assertEqual(num_docs_rx(c), 0))
185+
155186
for idx, node in enumerate(cluster):
156187
# Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
157188
# Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works.
@@ -282,6 +313,25 @@ def _test_rolling_upgrade(self, path, nodes):
282313
c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx])
283314
self.assertEqual(c.fetchall(), [[partition_version]])
284315

316+
# Ensure logical replications works
317+
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:
318+
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
319+
rc = replica_conn.cursor()
320+
321+
# Cannot drop replicated tables
322+
with self.assertRaises(ProgrammingError):
323+
rc.execute("drop table doc.x")
324+
c.execute("drop table doc.rx")
325+
326+
count = num_docs_x(rc)
327+
count2 = num_docs_rx(c)
328+
329+
c.execute("insert into doc.x values (1)")
330+
rc.execute("insert into doc.rx values (1)")
331+
332+
assert_busy(lambda: self.assertEqual(num_docs_x(rc), count + 1))
333+
assert_busy(lambda: self.assertEqual(num_docs_rx(c), count2 + 1))
334+
285335
# Finally validate that all shards (primaries and replicas) of all partitions are started
286336
# and writes into the partitioned table while upgrading were successful
287337
with connect(cluster.node().http_url, error_trace=True) as conn:

0 commit comments

Comments
 (0)