Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions bdd/go/tests/basic_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,47 @@ func (s basicMessagingSteps) thenLastPolledMessageMatchesSent(ctx context.Contex
return nil
}

func (s basicMessagingSteps) whenUpdateStreamName(ctx context.Context, newName string) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
if err := c.client.UpdateStream(streamIdentifier, newName); err != nil {
return fmt.Errorf("failed to update stream: %w", err)
}
c.lastStreamName = &newName
return nil
}

func (s basicMessagingSteps) thenStreamNameUpdated(ctx context.Context, expectedName string) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
stream, err := c.client.GetStream(streamIdentifier)
if err != nil {
return fmt.Errorf("failed to get stream: %w", err)
}
if stream.Name != expectedName {
return fmt.Errorf("expected stream name %s, got %s", expectedName, stream.Name)
}
return nil
}

func (s basicMessagingSteps) whenDeleteStream(ctx context.Context) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
if err := c.client.DeleteStream(streamIdentifier); err != nil {
return fmt.Errorf("failed to delete stream: %w", err)
}
c.lastStreamID = nil
return nil
}

func (s basicMessagingSteps) thenStreamDeletedSuccessfully(ctx context.Context) error {
c := getBasicMessagingCtx(ctx)
if c.lastStreamID != nil {
return errors.New("stream ID should be nil after deletion")
}
return nil
}
Comment on lines +245 to +251
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to call GetStream here to verify whether the stream is actually deleted.


func (s basicMessagingSteps) givenNoStreams(ctx context.Context) error {
client := getBasicMessagingCtx(ctx).client
streams, err := client.GetStreams()
Expand Down Expand Up @@ -322,10 +363,20 @@ func initBasicMessagingScenario(sc *godog.ScenarioContext) {
sc.Step(`the topic should be created successfully`, s.thenTopicCreatedSuccessfully)
sc.Step(`^the topic should have name "([^"]*)"$`, s.thenTopicHasName)
sc.Step(`^the topic should have (\d+) partitions$`, s.thenTopicsHasPartitions)
sc.Step(`^I update the stream name to "([^"]*)"$`, s.whenUpdateStreamName)
sc.Step(`^the stream name should be updated to "([^"]*)"$`, s.thenStreamNameUpdated)
sc.Step(`I delete the stream`, s.whenDeleteStream)
sc.Step(`the stream should be deleted successfully`, s.thenStreamDeletedSuccessfully)
Comment on lines +368 to +369
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend ^...$ for consistency. e.g. ^I delete the stream$

sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) {
c := getBasicMessagingCtx(ctx)
if err := c.client.Close(); err != nil {
scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err))
if c.client != nil && c.lastStreamID != nil {
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
_ = c.client.DeleteStream(streamIdentifier)
}
if c.client != nil {
if err := c.client.Close(); err != nil {
scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err))
}
Comment on lines +372 to +379
Copy link
Copy Markdown
Contributor

@chengxilo chengxilo Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure it’s worth cleaning this up. Even if we do want to, this approach isn’t ideal. We should probably provide a script that cleans up all resources instead (e.g., fetch all streams, topics, users, etc., and delete them), which would work across all scenarios and ensure everything is properly cleaned.

For example, if CreateStream(A) actually creates a stream B, but DeleteStream(A) only attempts to delete A, then who is responsible for deleting B?

}
return ctx, scErr
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BasicMessagingSteps {
Expand Down Expand Up @@ -209,6 +210,30 @@ public void lastPolledMessageMatchesSent() {
assertEquals(context.lastSentMessage, lastPayload, "Last message should match sent message");
}

@When("I update the stream name to {string}")
public void updateStreamName(String newName) {
getClient().streams().updateStream(context.lastStreamId, newName);
context.lastStreamName = newName;
}

@Then("the stream name should be updated to {string}")
public void streamNameUpdated(String expectedName) {
Optional<StreamDetails> stream = getClient().streams().getStream(context.lastStreamId);
assertTrue(stream.isPresent(), "Stream should exist");
assertEquals(expectedName, stream.get().name(), "Stream name should be updated");
}

@When("I delete the stream")
public void deleteStream() {
getClient().streams().deleteStream(context.lastStreamId);
context.lastStreamId = null;
}

@Then("the stream should be deleted successfully")
public void streamDeletedSuccessfully() {
assertNull(context.lastStreamId, "Stream should have been deleted");
}

private IggyBaseClient getClient() {
if (context.client == null) {
throw new IllegalStateException("Iggy client not initialized");
Expand Down
40 changes: 40 additions & 0 deletions bdd/python/tests/test_basic_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,43 @@ def verify_last_message_match(context):
last_polled_payload = last_polled.payload().decode("utf-8")

assert last_polled_payload == context.last_sent_message


@when(parsers.parse('I update the stream name to "{new_name}"'))
def update_stream_name(context, new_name):
"""Update the stream name"""

async def _update():
await context.client.update_stream(context.last_stream_id, new_name)
context.last_stream_name = new_name

asyncio.run(_update())


@then(parsers.parse('the stream name should be updated to "{expected_name}"'))
def verify_stream_name_updated(context, expected_name):
"""Verify stream name was updated"""

async def _verify():
stream = await context.client.get_stream(context.last_stream_id)
assert stream is not None
assert stream.name == expected_name

asyncio.run(_verify())


@when("I delete the stream")
def delete_stream(context):
"""Delete the stream"""

async def _delete():
await context.client.delete_stream(context.last_stream_id)
context.last_stream_id = None

asyncio.run(_delete())


@then("the stream should be deleted successfully")
def verify_stream_deleted(context):
"""Verify stream was deleted"""
assert context.last_stream_id is None
47 changes: 46 additions & 1 deletion bdd/rust/tests/steps/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::common::global_context::GlobalContext;
use cucumber::{given, then, when};
use iggy::prelude::StreamClient;
use iggy::prelude::{Identifier, StreamClient};

#[given("I have no streams in the system")]
pub async fn given_no_streams(world: &mut GlobalContext) {
Expand Down Expand Up @@ -64,3 +64,48 @@ pub async fn then_stream_has_name(world: &mut GlobalContext, expected_name: Stri
"Stream should have expected name"
);
}

#[when(regex = r#"^I update the stream name to "([^"]*)"$"#)]
pub async fn when_update_stream_name(world: &mut GlobalContext, new_name: String) {
let client = world.client.as_ref().expect("Client should be available");
let stream_id = world.last_stream_id.expect("Stream should exist");
let identifier = Identifier::numeric(stream_id).unwrap();
client
.update_stream(&identifier, &new_name)
.await
.expect("Should be able to update stream");
world.last_stream_name = Some(new_name);
}

#[then(regex = r#"^the stream name should be updated to "([^"]*)"$"#)]
pub async fn then_stream_name_updated(world: &mut GlobalContext, expected_name: String) {
let client = world.client.as_ref().expect("Client should be available");
let stream_id = world.last_stream_id.expect("Stream should exist");
let identifier = Identifier::numeric(stream_id).unwrap();
let stream = client
.get_stream(&identifier)
.await
.expect("Should be able to get stream")
.expect("Stream should exist");
assert_eq!(stream.name, expected_name, "Stream name should be updated");
}

#[when("I delete the stream")]
pub async fn when_delete_stream(world: &mut GlobalContext) {
let client = world.client.as_ref().expect("Client should be available");
let stream_id = world.last_stream_id.expect("Stream should exist");
let identifier = Identifier::numeric(stream_id).unwrap();
client
.delete_stream(&identifier)
.await
.expect("Should be able to delete stream");
world.last_stream_id = None;
}

#[then("the stream should be deleted successfully")]
pub async fn then_stream_deleted_successfully(world: &mut GlobalContext) {
assert!(
world.last_stream_id.is_none(),
"Stream should have been deleted"
);
}
6 changes: 6 additions & 0 deletions bdd/scenarios/basic_messaging.feature
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ Feature: Basic Messaging Operations
And the messages should have sequential offsets from 0 to 9
And each message should have the expected payload content
And the last polled message should match the last sent message

When I update the stream name to "test-stream-updated"
Then the stream name should be updated to "test-stream-updated"

When I delete the stream
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it would be better to have "stream with name" here (or "stream with id") to avoid implicit deduction of stream_name based on previously added/modified streams.

Then the stream should be deleted successfully
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,38 @@ public void ThenTheLastPolledMessageShouldMatchTheLastSentMessage()
lastPolled.Header.Id.ShouldBe(_context.LastSendMessage.Header.Id);
lastPolled.Payload.ShouldBe(_context.LastSendMessage.Payload);
}

[When("I update the stream name to {string}")]
public async Task WhenIUpdateTheStreamNameTo(string newName)
{
_context.CreatedStream.ShouldNotBeNull();
await _context.IggyClient.UpdateStreamAsync(
Identifier.Numeric(_context.CreatedStream!.Id), newName);
_context.CreatedStream = await _context.IggyClient.GetStreamByIdAsync(
Identifier.Numeric(_context.CreatedStream.Id));
}

[Then("the stream name should be updated to {string}")]
public void ThenTheStreamNameShouldBeUpdatedTo(string expectedName)
{
_context.CreatedStream.ShouldNotBeNull();
_context.CreatedStream!.Name.ShouldBe(expectedName);
}

[When(@"I delete the stream")]
public async Task WhenIDeleteTheStream()
{
_context.CreatedStream.ShouldNotBeNull();
await _context.IggyClient.DeleteStreamAsync(
Identifier.Numeric(_context.CreatedStream!.Id));
_context.CreatedStream = null;
}

[Then(@"the stream should be deleted successfully")]
public void ThenTheStreamShouldBeDeletedSuccessfully()
{
_context.CreatedStream.ShouldBeNull();
}
}

// Test context for sharing data between steps
35 changes: 35 additions & 0 deletions foreign/node/src/bdd/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@ Then(
}
);

When(
'I update the stream name to {string}',
async function (this: TestWorld, newName: string) {
assert.ok(await this.client.stream.update({
streamId: this.stream.id,
name: newName
}));
this.stream = { ...this.stream, name: newName };
}
);

Then(
'the stream name should be updated to {string}',
async function (this: TestWorld, expectedName: string) {
const stream = await this.client.stream.get({ streamId: this.stream.id });
assert.ok(stream, 'Stream should exist after update');
assert.equal(stream!.name, expectedName);
}
);

When(
'I delete the stream',
async function (this: TestWorld) {
assert.ok(await this.client.stream.delete({ streamId: this.stream.id }));
}
);

Then(
'the stream should be deleted successfully',
async function (this: TestWorld) {
// If we reached here without error, the stream was deleted successfully
assert.ok(true);
}
);

// Cleanup: delete stream after test
Then(
'I can delete stream with ID {int}',
Expand Down
16 changes: 16 additions & 0 deletions foreign/python/apache_iggy.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,22 @@ class IggyClient:

Returns Option of stream details or a PyRuntimeError on failure.
"""
def update_stream(
self, stream_id: builtins.str | builtins.int, name: builtins.str
) -> collections.abc.Awaitable[None]:
r"""
Updates a stream's name.

Returns Ok(()) on successful stream update or a PyRuntimeError on failure.
"""
def delete_stream(
self, stream_id: builtins.str | builtins.int
) -> collections.abc.Awaitable[None]:
r"""
Deletes a stream by id.

Returns Ok(()) on successful stream deletion or a PyRuntimeError on failure.
"""
def create_topic(
self,
stream: builtins.str | builtins.int,
Expand Down
42 changes: 42 additions & 0 deletions foreign/python/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,48 @@ impl IggyClient {
})
}

/// Updates a stream's name.
///
/// Returns Ok(()) on successful stream update or a PyRuntimeError on failure.
#[pyo3(signature = (stream_id, name))]
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
fn update_stream<'a>(
&self,
py: Python<'a>,
stream_id: PyIdentifier,
name: String,
) -> PyResult<Bound<'a, PyAny>> {
let stream_id = Identifier::from(stream_id);
let inner = self.inner.clone();
future_into_py(py, async move {
inner
.update_stream(&stream_id, &name)
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))?;
Ok(())
})
}

/// Deletes a stream by id.
///
/// Returns Ok(()) on successful stream deletion or a PyRuntimeError on failure.
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
fn delete_stream<'a>(
&self,
py: Python<'a>,
stream_id: PyIdentifier,
) -> PyResult<Bound<'a, PyAny>> {
let stream_id = Identifier::from(stream_id);
let inner = self.inner.clone();
future_into_py(py, async move {
inner
.delete_stream(&stream_id)
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))?;
Ok(())
})
}

/// Creates a new topic with the given parameters.
///
/// Returns Ok(()) on successful topic creation or a PyRuntimeError on failure.
Expand Down
Loading