Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add misisng getters datastream #55

Merged
merged 3 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dsh_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license.workspace = true
name = "dsh_sdk"
readme = 'README.md'
repository.workspace = true
version = "0.4.2"
version = "0.4.3"

[package.metadata.docs.rs]
all-features = true
Expand Down
138 changes: 137 additions & 1 deletion dsh_sdk/src/dsh/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,55 @@ pub struct Stream {
}

impl Stream {
/// Get the Stream's name
pub fn name(&self) -> &str {
&self.name
}

/// Get the Stream's cluster
pub fn cluster(&self) -> &str {
&self.cluster
}

/// Get the read pattern as stated in datastreams.
///
/// Use `read_pattern` method to validate if read access is allowed.
pub fn read(&self) -> &str {
&self.read
}

/// Get the write pattern
///
/// Use `write_pattern` method to validate if write access is allowed.
pub fn write(&self) -> &str {
&self.write
}

/// Get the Stream's number of partitions
pub fn partitions(&self) -> i32 {
self.partitions
}

/// Get the Stream's replication factor
pub fn replication(&self) -> i32 {
self.replication
}

/// Get the Stream's partitioner
pub fn partitioner(&self) -> &str {
&self.partitioner
}

/// Get the Stream's partitioning depth
pub fn partitioning_depth(&self) -> i32 {
self.partitioning_depth
}

/// Get the Stream's can retain value
pub fn can_retain(&self) -> bool {
self.can_retain
}

/// Check read access on topic based on datastream
pub fn read_access(&self) -> bool {
!self.read.is_empty()
Expand All @@ -237,6 +286,9 @@ impl Stream {
}

/// Get the Stream's Read whitelist pattern
///
/// ## Error
/// If the topic does not have read access it returns a `TopicPermissionsError`
pub fn read_pattern(&self) -> Result<&str, DshError> {
if self.read_access() {
Ok(&self.read)
Expand All @@ -249,6 +301,9 @@ impl Stream {
}

/// Get the Stream's Write pattern
///
/// ## Error
/// If the topic does not have write access it returns a `TopicPermissionsError`
pub fn write_pattern(&self) -> Result<&str, DshError> {
if self.write_access() {
Ok(&self.write)
Expand Down Expand Up @@ -342,7 +397,7 @@ mod tests {
"replication": 1,
"partitioner": "default-partitioner",
"partitioningDepth": 0,
"canRetain": false
"canRetain": true
}
},
"private_consumer_groups": [
Expand All @@ -363,6 +418,87 @@ mod tests {
.to_string()
}

#[test]
fn test_name() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.name(), "scratch.test");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.name(), "stream.test");
}

#[test]
fn test_read() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.read(), "scratch.test.test-tenant");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.read(), "stream\\.test\\.[^.]*");
}

#[test]
fn test_write() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.write(), "scratch.test.test-tenant");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.write(), "");
}

#[test]
fn test_cluster() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.cluster(), "/tt");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.cluster(), "/tt");
}

#[test]
fn test_partitions() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.partitions(), 3);
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.partitions(), 1);
}

#[test]
fn test_replication() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.replication(), 1);
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.replication(), 1);
}

#[test]
fn test_partitioner() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.partitioner(), "default-partitioner");
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.partitioner(), "default-partitioner");
}

#[test]
fn test_partitioning_depth() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.partitioning_depth(), 0);
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.partitioning_depth(), 0);
}

#[test]
fn test_can_retain() {
let datastream = datastream();
let stream = datastream.streams().get("scratch.test").unwrap();
assert_eq!(stream.can_retain(), false);
let stream = datastream.streams().get("stream.test").unwrap();
assert_eq!(stream.can_retain(), true);
}

#[test]
fn test_datastream_get_brokers() {
assert_eq!(
Expand Down
Loading