Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor streaming_search_inner to return a Stream object #15

Merged
merged 13 commits into from
Oct 29, 2024

Conversation

keaz
Copy link
Owner

@keaz keaz commented Oct 9, 2024

This commit refactors the streaming_search_inner method in the LdapClient struct to return a Stream object instead of a vector of SearchEntry objects. The Stream object encapsulates the search stream and provides methods for iterating over the search results. This change improves the usability and flexibility of the streaming_search method.

This commit refactors the `streaming_search_inner` method in the `LdapClient` struct to return a `Stream` object instead of a vector of `SearchEntry` objects. The `Stream` object encapsulates the search stream and provides methods for iterating over the search results. This change improves the usability and flexibility of the `streaming_search` method.
@keaz keaz self-assigned this Oct 9, 2024
@keaz keaz linked an issue Oct 9, 2024 that may be closed by this pull request
Copy link

@StarlessNights StarlessNights left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to review!

Cargo.toml Outdated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the return type of a function warrants a major version bump.

PS. cargo-semver-checks is a pretty neat semver linter.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bumped major version

@@ -1225,6 +1113,114 @@ impl LdapClient {
}
}

pub struct Stream<'a, S, A> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still only a "stream like object" (just like in ldap3). We want to return an object that implements the futures::stream::Stream trait. I confess that I haven't ever implemented this myself, but taking inspiration from page_turner::mt::PageTurner::pages_ahead it would seem like futures::stream::try_unfold would be able to do most of the work. Probably you can more or less just wrap the code you just wrote inside that.

What's the point you may ask? With Stream implemented we can then bring in StreamExt trait which allows us to use map, filter etc all the iterator like stuff on the stream.

Copy link
Owner Author

@keaz keaz Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StarlessNights ,Implemented Stream trait for Stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, going the hardcore way I see.

One more thing sprung to my mind. The stream object may be dropped at any point. We should implement Drop and try to abandon the search there, like you've done in next_inner. I suspect that it will be slightly awkward as it's not an async function, but perhaps something like async-dropper could help there.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't easy to implement async-dropper as it expects Eq to be implemented on the Stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah hopefully we'll get language support at some point. Perhaps just blocking for the abandoning would be an acceptable compromise. Honestly I don't know how angry the LDAP/AD servers get if you just leave a search dangling?

@keaz keaz requested a review from StarlessNights October 14, 2024 08:50
@StarlessNights
Copy link

Thanks, I'll take a closer look in the upcoming days.

Copy link

@StarlessNights StarlessNights left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! With a few tweaks we're there I think.

src/lib.rs Outdated
return Poll::Ready(None);
}
// Ensure the task is woken when the next record is ready
cx.waker().wake_by_ref();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should wake here by ourselves. The inner future will do the waking when it's ready. It has the same context.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are correct.

src/lib.rs Outdated
@@ -450,7 +455,7 @@ impl LdapClient {
base: &'a str,
scope: Scope,
filter: &'a impl Filter,
limit: i32,
limit: usize,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit is a spurious argument. The caller can achieve the same by calling take(n) on the stream.

On the other hand perhaps allowing the user to customize the page-size would be useful here? This would imply calling streaming_search_with internally. See this example in ldap3.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the limit and added a separate method for streaming_search_with

@@ -1225,6 +1113,114 @@ impl LdapClient {
}
}

pub struct Stream<'a, S, A> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, going the hardcore way I see.

One more thing sprung to my mind. The stream object may be dropped at any point. We should implement Drop and try to abandon the search there, like you've done in next_inner. I suspect that it will be slightly awkward as it's not an async function, but perhaps something like async-dropper could help there.


impl Record {
pub fn to_record<T: for<'b> serde::Deserialize<'b>>(self) -> Result<T, Error> {
let json = LdapClient::create_json_signle_value(self.search_entry).unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to streams anymore but this intermediate conversion to JSON seems unnecessary? serde_json could probably be refactored out completely, but perhaps that's a matter for another PR.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is something that I want to improve. I will work on this in a separate PR.

src/lib.rs Outdated
.await?;

Ok(entry)
}

///
/// This method is used to search multiple records from the LDAP server and results will be pageinated.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small typo here. paginated

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

src/lib.rs Outdated
let search_stream = self
.ldap
.streaming_search_with(
PagedResults::new(page_size),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EntriesOnly adapter is needed here too according to this comment in ldap3.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1225,6 +1113,114 @@ impl LdapClient {
}
}

pub struct Stream<'a, S, A> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah hopefully we'll get language support at some point. Perhaps just blocking for the abandoning would be an acceptable compromise. Honestly I don't know how angry the LDAP/AD servers get if you just leave a search dangling?

src/lib.rs Outdated
@@ -1195,6 +1254,13 @@ where
StreamResult::Finished => Ok(StreamResult::Finished),
}
}

pub async fn cleanup(&mut self) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...Or manual cleanup isn't so bad either, as long as it's documented.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented the cleanup steps.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The print line got left in there. Probably it was just for debugging?

We probably shouldn't abandon searches that have been run to completion. How about checking the stream state and abandoning only on non-finished states?

The unwrap is kinda scary too. Perhaps just return a result?

@keaz keaz requested a review from StarlessNights October 21, 2024 16:04
/// }
/// }
/// }
/// stream.cleanup().await;
/// }
/// ```
pub async fn streaming_search_with<'a>(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something like steaming_search_paged would be a better name, as the caller is not allowed to directly specify the adapters.

Also, do we really need to consume self here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that the consuming makes some sense given that we're using a pool 🤔.

How about the naming though @keaz?

src/lib.rs Outdated
@@ -1195,6 +1254,13 @@ where
StreamResult::Finished => Ok(StreamResult::Finished),
}
}

pub async fn cleanup(&mut self) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The print line got left in there. Probably it was just for debugging?

We probably shouldn't abandon searches that have been run to completion. How about checking the stream state and abandoning only on non-finished states?

The unwrap is kinda scary too. Perhaps just return a result?

README.md Outdated
@@ -159,7 +159,7 @@ async fn main() -> Result<()> {
}
}
assert!(count == 2);
Ok(ldap.unbind().await?)
Ok(result.cleanup()?)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should still be awaited. (Remember to take into the account the potential change to cleanups type.)

@StarlessNights
Copy link

The CI tests are failing?

@keaz
Copy link
Owner Author

keaz commented Oct 23, 2024

The CI tests are failing?

Yes, it never worked. I fixed it now.

@keaz keaz added the enhancement New feature or request label Oct 24, 2024
@keaz keaz requested a review from StarlessNights October 25, 2024 01:00
Removed pull request from ci.yml
Copy link

@StarlessNights StarlessNights left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have zero experience on the github ci stuff so I cannot comment on that.

On the rust side of things all seems good. (Apart from the slightly confusingly named streaming_search_with)

@keaz keaz merged commit 839ea08 into main Oct 29, 2024
1 check passed
@keaz keaz deleted the streaming-search-that-returns-a-stream branch October 29, 2024 10:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Streaming search that returns a stream
2 participants