Skip to content

Commit 72c44d9

Browse files
committed
integration test
1 parent 08950ca commit 72c44d9

File tree

1 file changed

+305
-0
lines changed

1 file changed

+305
-0
lines changed
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
# Copyright 2025 Oliver Lambson
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
"""Integration tests for jetstreampcg.
15+
16+
These tests are ported from orbit.go/pcgroups/test/stream_consumer_group_test.go
17+
"""
18+
19+
import asyncio
20+
21+
import pytest
22+
from nats.js import JetStreamContext
23+
from nats.js.api import AckPolicy, ConsumerConfig, StreamConfig, SubjectTransform
24+
25+
from jetstreampcg.elastic import (
26+
add_members,
27+
create_elastic,
28+
delete_elastic,
29+
delete_members,
30+
)
31+
from jetstreampcg.static import create_static, delete_static, static_consume
32+
33+
34+
@pytest.mark.asyncio
35+
class TestStaticIntegration:
36+
"""Integration tests for static consumer groups.
37+
38+
Ported from orbit.go/pcgroups/test/stream_consumer_group_test.go:TestStatic
39+
"""
40+
41+
async def test_static_consumer_group(self, js_client: JetStreamContext):
42+
"""Test static consumer group with two members consuming messages in parallel."""
43+
stream_name = "test-static"
44+
cg_name = "group"
45+
c1_count = 0
46+
c2_count = 0
47+
48+
# Create a stream with subject transform for partitioning
49+
await js_client.add_stream(
50+
StreamConfig(
51+
name=stream_name,
52+
subjects=["bar.*"],
53+
subject_transform=SubjectTransform(
54+
src="bar.*", dest="{{partition(2,1)}}.bar.{{wildcard(1)}}"
55+
),
56+
)
57+
)
58+
59+
# Publish 10 messages
60+
for i in range(10):
61+
await js_client.publish(f"bar.{i}", b"payload")
62+
63+
# Consumer config
64+
consumer_config = ConsumerConfig(
65+
max_ack_pending=1,
66+
ack_wait=1.0,
67+
ack_policy=AckPolicy.EXPLICIT,
68+
)
69+
70+
# Create static consumer group with 2 members
71+
await create_static(
72+
js_client,
73+
stream_name,
74+
cg_name,
75+
max_members=2,
76+
filter="bar.*",
77+
members=["m1", "m2"],
78+
member_mappings=[],
79+
)
80+
81+
# Track when to stop consuming
82+
stop_event = asyncio.Event()
83+
84+
# Consumer 1
85+
async def consume_m1():
86+
nonlocal c1_count
87+
88+
async def m1_handler(msg):
89+
nonlocal c1_count
90+
c1_count += 1
91+
await msg.ack()
92+
93+
ctx = await static_consume(
94+
js_client, stream_name, cg_name, "m1", m1_handler, consumer_config
95+
)
96+
97+
# Wait for stop signal
98+
await stop_event.wait()
99+
ctx.stop()
100+
await ctx.done()
101+
102+
# Consumer 2
103+
async def consume_m2():
104+
nonlocal c2_count
105+
106+
async def m2_handler(msg):
107+
nonlocal c2_count
108+
c2_count += 1
109+
await msg.ack()
110+
111+
ctx = await static_consume(
112+
js_client, stream_name, cg_name, "m2", m2_handler, consumer_config
113+
)
114+
115+
# Wait for stop signal
116+
await stop_event.wait()
117+
ctx.stop()
118+
await ctx.done()
119+
120+
# Start both consumers
121+
task1 = asyncio.create_task(consume_m1())
122+
task2 = asyncio.create_task(consume_m2())
123+
124+
# Wait for all messages to be consumed (with timeout)
125+
start_time = asyncio.get_event_loop().time()
126+
while c1_count + c2_count < 10:
127+
await asyncio.sleep(0.1)
128+
if asyncio.get_event_loop().time() - start_time > 5:
129+
pytest.fail("Timeout waiting for messages to be consumed")
130+
131+
# Signal consumers to stop
132+
stop_event.set()
133+
134+
# Wait for consumers to finish
135+
await asyncio.gather(task1, task2)
136+
137+
# Verify all messages were consumed
138+
assert c1_count + c2_count == 10
139+
140+
# Clean up
141+
await delete_static(js_client, stream_name, cg_name)
142+
143+
144+
@pytest.mark.asyncio
145+
class TestElasticIntegration:
146+
"""Integration tests for elastic consumer groups.
147+
148+
Ported from orbit.go/pcgroups/test/stream_consumer_group_test.go:TestElastic
149+
"""
150+
151+
async def test_elastic_consumer_group_with_membership_changes(
152+
self, js_client: JetStreamContext
153+
):
154+
"""Test elastic consumer group with dynamic member addition and removal."""
155+
stream_name = "test-elastic"
156+
cg_name = "group"
157+
c1_count = 0
158+
c2_count = 0
159+
160+
# Create a stream
161+
await js_client.add_stream(
162+
StreamConfig(
163+
name=stream_name,
164+
subjects=["bar.*"],
165+
)
166+
)
167+
168+
# Publish 10 messages
169+
for i in range(10):
170+
await js_client.publish(f"bar.{i}", b"payload")
171+
172+
# Consumer config
173+
consumer_config = ConsumerConfig(
174+
max_ack_pending=1,
175+
ack_wait=1.0,
176+
ack_policy=AckPolicy.EXPLICIT,
177+
)
178+
179+
# Create elastic consumer group with max 2 members
180+
await create_elastic(
181+
js_client,
182+
stream_name,
183+
cg_name,
184+
max_num_members=2,
185+
filter="bar.*",
186+
partitioning_wildcards=[1],
187+
)
188+
189+
# Track when to stop consuming
190+
stop_event_m1 = asyncio.Event()
191+
stop_event_m2 = asyncio.Event()
192+
193+
# Consumer 1
194+
async def consume_m1():
195+
nonlocal c1_count
196+
197+
async def m1_handler(msg):
198+
nonlocal c1_count
199+
c1_count += 1
200+
await msg.ack()
201+
202+
from jetstreampcg.elastic import elastic_consume
203+
204+
ctx = await elastic_consume(
205+
js_client, stream_name, cg_name, "m1", m1_handler, consumer_config
206+
)
207+
208+
# Wait for stop signal
209+
await stop_event_m1.wait()
210+
ctx.stop()
211+
await ctx.done()
212+
213+
# Consumer 2
214+
async def consume_m2():
215+
nonlocal c2_count
216+
217+
async def m2_handler(msg):
218+
nonlocal c2_count
219+
c2_count += 1
220+
await msg.ack()
221+
222+
from jetstreampcg.elastic import elastic_consume
223+
224+
ctx = await elastic_consume(
225+
js_client, stream_name, cg_name, "m2", m2_handler, consumer_config
226+
)
227+
228+
# Wait for stop signal
229+
await stop_event_m2.wait()
230+
ctx.stop()
231+
await ctx.done()
232+
233+
# Start both consumers
234+
task1 = asyncio.create_task(consume_m1())
235+
task2 = asyncio.create_task(consume_m2())
236+
237+
# Add only m1 to membership
238+
await add_members(js_client, stream_name, cg_name, ["m1"])
239+
240+
# Wait for m1 to consume all 10 messages (m2 should not consume any)
241+
start_time = asyncio.get_event_loop().time()
242+
while c1_count != 10 or c2_count != 0:
243+
await asyncio.sleep(0.1)
244+
if asyncio.get_event_loop().time() - start_time > 5:
245+
pytest.fail(
246+
f"Timeout: expected c1=10, c2=0, got c1={c1_count}, c2={c2_count}"
247+
)
248+
249+
assert c1_count == 10
250+
assert c2_count == 0
251+
252+
# Add m2 to membership
253+
await add_members(js_client, stream_name, cg_name, ["m2"])
254+
255+
# Wait a bit for m2 to be effectively added
256+
await asyncio.sleep(0.05)
257+
258+
# Publish 10 more messages
259+
for i in range(10):
260+
await js_client.publish(f"bar.{i}", b"payload")
261+
262+
# Wait for messages to be split between m1 and m2
263+
start_time = asyncio.get_event_loop().time()
264+
while c1_count + c2_count < 20:
265+
await asyncio.sleep(0.1)
266+
if asyncio.get_event_loop().time() - start_time > 10:
267+
pytest.fail(
268+
f"Timeout: expected total=20, got c1={c1_count}, c2={c2_count}"
269+
)
270+
271+
# Both should have consumed some messages (split between them)
272+
assert c1_count == 15
273+
assert c2_count == 5
274+
275+
# Remove m1 from membership
276+
await delete_members(js_client, stream_name, cg_name, ["m1"])
277+
278+
# Wait a bit for m1 to be effectively deleted
279+
await asyncio.sleep(0.05)
280+
281+
# Publish 10 more messages
282+
for i in range(10):
283+
await js_client.publish(f"bar.{i}", b"payload")
284+
285+
# Wait for m2 to consume all new messages (m1 should not consume any more)
286+
start_time = asyncio.get_event_loop().time()
287+
while c1_count != 15 or c2_count != 15:
288+
await asyncio.sleep(0.1)
289+
if asyncio.get_event_loop().time() - start_time > 10:
290+
pytest.fail(
291+
f"Timeout: expected c1=15, c2=15, got c1={c1_count}, c2={c2_count}"
292+
)
293+
294+
assert c1_count == 15
295+
assert c2_count == 15
296+
297+
# Signal consumers to stop
298+
stop_event_m1.set()
299+
stop_event_m2.set()
300+
301+
# Wait for consumers to finish
302+
await asyncio.gather(task1, task2)
303+
304+
# Clean up
305+
await delete_elastic(js_client, stream_name, cg_name)

0 commit comments

Comments
 (0)