Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor streaming_search_inner to return a Stream object #15

Merged
merged 13 commits into from
Oct 29, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn main() -> Result<()> {
}
}
assert!(count == 2);
Ok(ldap.unbind().await?)
Ok(result.cleanup()?)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should still be awaited. (Remember to take into the account the potential change to cleanups type.)

}
```

Expand Down
165 changes: 127 additions & 38 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use deadpool::managed::{Object, PoolError};
use filter::{AndFilter, EqFilter, Filter};
use futures::{future::BoxFuture, FutureExt};
use ldap3::{
adapters::PagedResults,
adapters::{Adapter, EntriesOnly, PagedResults},
log::{debug, error},
Ldap, LdapError, Mod, Scope, SearchEntry, SearchStream, StreamState,
};
Expand Down Expand Up @@ -411,6 +411,7 @@ impl LdapClient {
///
/// This method is used to search multiple records from the LDAP server. The search is performed using the provided filter.
/// Method will return a Stream. The stream can be used to iterate through the search results.
/// This method take the ownership of the LdapClient.
///
/// # Arguments
/// * `base` - The base DN to search for the user
Expand Down Expand Up @@ -446,7 +447,19 @@ impl LdapClient {
/// let mut ldap = pool.get_connection().await;
///
/// let name_filter = EqFilter::from("cn".to_string(), "Sam".to_string());
/// let user = ldap.streaming_search::<User>("", self::ldap3::Scope::OneLevel, &name_filter,vec!["cn", "sn", "uid"]).await;
/// let mut stream = ldap.streaming_search("", self::ldap3::Scope::OneLevel, &name_filter, vec!["cn", "sn", "uid"], 3).await.unwrap();
/// while let Some(record) = stream.next().await {
/// match record {
/// Ok(record) => {
/// let user: User = record.to_record().unwrap();
/// println!("User: {:?}", user);
/// }
/// Err(err) => {
/// println!("Error: {:?}", err);
/// }
/// }
/// }
/// stream.cleanup().await;
/// }
/// ```
pub async fn streaming_search<'a>(
Expand All @@ -464,8 +477,9 @@ impl LdapClient {
}

///
/// This method is used to search multiple records from the LDAP server and results will be pageinated.
/// This method is used to search multiple records from the LDAP server and results will be paginated.
/// Method will return a Stream. The stream can be used to iterate through the search results.
/// This method take the ownership of the LdapClient.
///
/// # Arguments
/// * `base` - The base DN to search for the user
Expand Down Expand Up @@ -502,7 +516,19 @@ impl LdapClient {
/// let mut ldap = pool.get_connection().await;
///
/// let name_filter = EqFilter::from("cn".to_string(), "Sam".to_string());
/// let user = ldap.streaming_search::<User>("", self::ldap3::Scope::OneLevel, &name_filter, 3, vec!["cn", "sn", "uid"]).await;
/// let mut stream = ldap.streaming_search("", self::ldap3::Scope::OneLevel, &name_filter, vec!["cn", "sn", "uid"], 3).await.unwrap();
/// while let Some(record) = stream.next().await {
/// match record {
/// Ok(record) => {
/// let user: User = record.to_record().unwrap();
/// println!("User: {:?}", user);
/// }
/// Err(err) => {
/// println!("Error: {:?}", err);
/// }
/// }
/// }
/// stream.cleanup().await;
/// }
/// ```
pub async fn streaming_search_with<'a>(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something like steaming_search_paged would be a better name, as the caller is not allowed to directly specify the adapters.

Also, do we really need to consume self here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that the consuming makes some sense given that we're using a pool 🤔.

How about the naming though @keaz?

Expand All @@ -513,15 +539,13 @@ impl LdapClient {
attributes: &'a Vec<&'a str>,
page_size: i32,
) -> Result<Stream<'a, &'a str, &'a Vec<&'a str>>, Error> {
let adapters: Vec<Box<dyn Adapter<_, _>>> = vec![
Box::new(EntriesOnly::new()),
Box::new(PagedResults::new(page_size)),
];
let search_stream = self
.ldap
.streaming_search_with(
PagedResults::new(page_size),
base,
scope,
filter.filter().as_str(),
attributes,
)
.streaming_search_with(adapters, base, scope, filter.filter().as_str(), attributes)
.await;

if let Err(error) = search_stream {
Expand Down Expand Up @@ -1190,10 +1214,13 @@ impl LdapClient {
}
}

/// The Stream struct is used to iterate through the search results.
/// The stream will return a Record object. The Record object can be used to map the search result to a struct.
/// After the stream is finished, the cleanup method should be called to cleanup the stream.
///
pub struct Stream<'a, S, A> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still only a "stream like object" (just like in ldap3). We want to return an object that implements the futures::stream::Stream trait. I confess that I haven't ever implemented this myself, but taking inspiration from page_turner::mt::PageTurner::pages_ahead it would seem like futures::stream::try_unfold would be able to do most of the work. Probably you can more or less just wrap the code you just wrote inside that.

What's the point you may ask? With Stream implemented we can then bring in StreamExt trait which allows us to use map, filter etc all the iterator like stuff on the stream.

Copy link
Owner Author

@keaz keaz Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StarlessNights ,Implemented Stream trait for Stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, going the hardcore way I see.

One more thing sprung to my mind. The stream object may be dropped at any point. We should implement Drop and try to abandon the search there, like you've done in next_inner. I suspect that it will be slightly awkward as it's not an async function, but perhaps something like async-dropper could help there.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't easy to implement async-dropper as it expects Eq to be implemented on the Stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah hopefully we'll get language support at some point. Perhaps just blocking for the abandoning would be an acceptable compromise. Honestly I don't know how angry the LDAP/AD servers get if you just leave a search dangling?

ldap: Object<Manager>,
search_stream: SearchStream<'a, S, A>,
cleanup_future: Option<BoxFuture<'a, ()>>,
}

impl<'a, S, A> Stream<'a, S, A>
Expand All @@ -1205,7 +1232,6 @@ where
Stream {
ldap,
search_stream,
cleanup_future: None,
}
}

Expand Down Expand Up @@ -1237,24 +1263,40 @@ where
}
}

pub async fn multi_valued_next<T: for<'b> serde::Deserialize<'b>>(
&mut self,
) -> Result<StreamResult<T>, Error> {
let entry = self.next_inner().await?;
match entry {
StreamResult::Record(entry) => {
let json = LdapClient::create_json_multi_value(entry).unwrap();
let data = LdapClient::map_to_struct::<T>(json);
if let Err(err) = data {
return Err(Error::Mapping(format!("Error mapping record: {:?}", err)));
}
return Ok(StreamResult::Record(data.unwrap()));
}
StreamResult::Done => Ok(StreamResult::Done),
StreamResult::Finished => Ok(StreamResult::Finished),
}
}

/// Cleanup the stream. This method should be called after the stream is finished.
/// This method will cleanup the stream and close the connection.
/// # Example
/// ```
/// use simple_ldap::LdapClient;
/// use simple_ldap::pool::LdapConfig;
///
/// async fn main(){
/// let ldap_config = LdapConfig {
/// bind_dn: "cn=manager".to_string(),
/// bind_pw: "password".to_string(),
/// ldap_url: "ldap://ldap_server:1389/dc=example,dc=com".to_string(),
/// pool_size: 10,
/// dn_attribute: None
/// };
///
/// let pool = pool::build_connection_pool(&ldap_config).await;
/// let mut ldap = pool.get_connection().await;
///
/// let mut stream = ldap.streaming_search("", self::ldap3::Scope::OneLevel, &name_filter, vec!["cn", "sn", "uid"], 3).await.unwrap();
/// while let Some(record) = stream.next().await {
/// match record {
/// Ok(record) => {
/// let user: User = record.to_record().unwrap();
/// println!("User: {:?}", user);
/// }
/// Err(err) => {
/// println!("Error: {:?}", err);
/// }
/// }
/// }
/// stream.cleanup().await;
/// }
/// ```
pub async fn cleanup(&mut self) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...Or manual cleanup isn't so bad either, as long as it's documented.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented the cleanup steps.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The print line got left in there. Probably it was just for debugging?

We probably shouldn't abandon searches that have been run to completion. How about checking the stream state and abandoning only on non-finished states?

The unwrap is kinda scary too. Perhaps just return a result?

println!("Cleaning up");
let _res = self.search_stream.finish().await;
Expand Down Expand Up @@ -1288,22 +1330,68 @@ where
return Poll::Ready(Some(Err(er)));
}
},
Poll::Pending => {
// if self.count == self.limit {
// return Poll::Ready(None);
// }
// Ensure the task is woken when the next record is ready
Poll::Pending
}
Poll::Pending => Poll::Pending,
}
}
}

/// The Record struct is used to map the search result to a struct.
/// The Record struct has a method to_record which will map the search result to a struct.
/// The Record struct has a method to_multi_valued_record which will map the search result to a struct with multi valued attributes.
pub struct Record {
search_entry: SearchEntry,
}

impl Record {
/// Create a new Record object with single valued attributes
/// # Example
/// ```
/// use simple_ldap::LdapClient;
/// use simple_ldap::pool::LdapConfig;
/// use simple_ldap::filter::EqFilter;
///
/// #[derive(Debug, Deserialize)]
/// struct User {
/// uid: String,
/// cn: String,
/// sn: String,
/// }
///
/// async fn main(){
///
/// let ldap_config = LdapConfig {
/// bind_dn: "cn=manager".to_string(),
/// bind_pw: "password".to_string(),
/// ldap_url: "ldap://ldap_server:1389/dc=example,dc=com".to_string(),
/// pool_size: 10,
/// dn_attribute: None,
/// };
/// let pool = pool::build_connection_pool(&ldap_config).await;
/// let ldap = pool.get_connection().await.unwrap();
///
/// let name_filter = EqFilter::from("cn".to_string(), "James".to_string());
/// let attra = vec!["cn", "sn", "uid"];
/// let result = ldap.streaming_search(
/// "ou=people,dc=example,dc=com",
/// self::ldap3::Scope::OneLevel,
/// &name_filter,
/// &attra,).await;
///
/// let mut result = result.unwrap();
/// let mut count = 0;
/// while let Some(record) = result.next().await {
/// match record {
/// Ok(record) => {
/// let user = record.to_record::<User>().unwrap();
/// count += 1;
/// }
/// Err(_) => {
/// break;
/// }
/// }
/// }
/// result.cleanup().await;
/// }
pub fn to_record<T: for<'b> serde::Deserialize<'b>>(self) -> Result<T, Error> {
let json = LdapClient::create_json_signle_value(self.search_entry).unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to streams anymore but this intermediate conversion to JSON seems unnecessary? serde_json could probably be refactored out completely, but perhaps that's a matter for another PR.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is something that I want to improve. I will work on this in a separate PR.

let data = LdapClient::map_to_struct::<T>(json);
Expand Down Expand Up @@ -1690,6 +1778,7 @@ mod tests {
}
}
}
result.cleanup().await;
assert!(count == 2);
}

Expand Down
Loading