@@ -8,7 +8,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
88use futures:: stream:: { BoxStream , Fuse } ;
99use futures:: StreamExt ;
1010use indexmap:: IndexMap ;
11- use object_store:: list:: PaginatedListStore ;
11+ use object_store:: list:: { PaginatedListOptions , PaginatedListStore } ;
1212use object_store:: path:: Path ;
1313use object_store:: { ListResult , ObjectMeta , ObjectStore } ;
1414use pyo3:: exceptions:: { PyImportError , PyStopAsyncIteration , PyStopIteration , PyValueError } ;
@@ -24,7 +24,7 @@ use pyo3_object_store::{
2424} ;
2525use tokio:: sync:: Mutex ;
2626
27- enum MaybePaginatedListStore {
27+ pub ( crate ) enum MaybePaginatedListStore {
2828 SupportsPagination ( Arc < dyn PaginatedListStore > ) ,
2929 NoPagination ( Arc < dyn ObjectStore > ) ,
3030}
@@ -452,7 +452,7 @@ impl<'py> IntoPyObject<'py> for PyListResult {
452452#[ pyo3( signature = ( store, prefix=None , * , offset=None , chunk_size=50 , return_arrow=false ) ) ]
453453pub ( crate ) fn list (
454454 py : Python ,
455- store : PyObjectStore ,
455+ store : MaybePaginatedListStore ,
456456 prefix : Option < String > ,
457457 offset : Option < String > ,
458458 chunk_size : usize ,
@@ -470,12 +470,13 @@ pub(crate) fn list(
470470 . map_err ( |err| PyImportError :: new_err ( format ! ( "{msg}\n \n {err}" ) ) ) ?;
471471 }
472472
473- let store = store. into_inner ( ) . clone ( ) ;
474- let prefix = prefix. map ( |s| s. into ( ) ) ;
475- let stream = if let Some ( offset) = offset {
476- store. list_with_offset ( prefix. as_ref ( ) , & offset. into ( ) )
477- } else {
478- store. list ( prefix. as_ref ( ) )
473+ let stream = match store {
474+ MaybePaginatedListStore :: SupportsPagination ( paginated_store) => {
475+ create_paginated_stream ( paginated_store, prefix, offset, chunk_size)
476+ }
477+ MaybePaginatedListStore :: NoPagination ( object_store) => {
478+ create_filtered_stream ( object_store, prefix, offset)
479+ }
479480 } ;
480481 Ok ( PyListStream :: new ( stream, chunk_size, return_arrow) )
481482}
@@ -526,3 +527,102 @@ async fn list_with_delimiter_materialize(
526527 let list_result = store. list_with_delimiter ( prefix) . await ?;
527528 Ok ( PyListResult :: new ( list_result, return_arrow) )
528529}
530+
531+ fn create_paginated_stream (
532+ store : Arc < dyn PaginatedListStore > ,
533+ prefix : Option < String > ,
534+ offset : Option < String > ,
535+ chunk_size : usize ,
536+ ) -> BoxStream < ' static , object_store:: Result < ObjectMeta > > {
537+ // Create a stream that will fetch from the paginated store
538+ let stream = futures:: stream:: unfold (
539+ ( store, prefix, offset, None , true ) ,
540+ move |( store, prefix, offset, page_token, has_more) | async move {
541+ if !has_more {
542+ return None ;
543+ }
544+
545+ let opts = PaginatedListOptions {
546+ offset : offset. clone ( ) ,
547+ delimiter : None ,
548+ max_keys : Some ( chunk_size) ,
549+ page_token,
550+ ..Default :: default ( )
551+ } ;
552+
553+ match store. list_paginated ( prefix. as_deref ( ) , opts) . await {
554+ Ok ( result) => {
555+ let next_has_more = result. page_token . is_some ( ) ;
556+ let next_page_token = result. page_token ;
557+ let objects = result. result . objects ;
558+
559+ let next_state = ( store, prefix, offset, next_page_token, next_has_more) ;
560+ Some ( ( objects, next_state) )
561+ }
562+ Err ( _e) => {
563+ // TODO: propagate error
564+ // For errors, return empty list and stop
565+ Some ( ( Vec :: new ( ) , ( store, prefix, offset, None , false ) ) )
566+ }
567+ }
568+ } ,
569+ )
570+ . flat_map ( |objects| futures:: stream:: iter ( objects. into_iter ( ) . map ( Ok ) ) ) ;
571+
572+ Box :: pin ( stream)
573+ }
574+
575+ fn create_filtered_stream (
576+ store : Arc < dyn ObjectStore > ,
577+ prefix : Option < String > ,
578+ offset : Option < String > ,
579+ ) -> BoxStream < ' static , object_store:: Result < ObjectMeta > > {
580+ // For substring filtering, we need to split the prefix into:
581+ // 1. A directory prefix for efficient listing
582+ // 2. A substring filter to apply to the results
583+ let ( list_prefix, substring_filter) = if let Some ( prefix_str) = & prefix {
584+ if let Some ( ( dir_prefix, substring) ) = prefix_str. rsplit_once ( '/' ) {
585+ ( Some ( dir_prefix. to_string ( ) ) , Some ( substring. to_string ( ) ) )
586+ } else {
587+ ( None , Some ( prefix_str. clone ( ) ) )
588+ }
589+ } else {
590+ ( None , None )
591+ } ;
592+
593+ let prefix_path = list_prefix. map ( |s| s. into ( ) ) ;
594+ let base_stream = if let Some ( offset) = offset {
595+ store. list_with_offset ( prefix_path. as_ref ( ) , & offset. into ( ) )
596+ } else {
597+ store. list ( prefix_path. as_ref ( ) )
598+ } ;
599+
600+ // Apply substring filtering if needed
601+ let filtered_stream = if let Some ( substring) = substring_filter {
602+ Box :: pin ( base_stream. filter_map ( move |result| {
603+ let substring = substring. clone ( ) ;
604+ async move {
605+ match result {
606+ Ok ( meta) => {
607+ // Extract filename from path for substring matching
608+ let path_str = meta. location . as_ref ( ) ;
609+ if let Some ( filename) = path_str. split ( '/' ) . last ( ) {
610+ if filename. contains ( & substring) {
611+ Some ( Ok ( meta) )
612+ } else {
613+ None
614+ }
615+ } else {
616+ Some ( Ok ( meta) )
617+ }
618+ }
619+ Err ( e) => Some ( Err ( e) ) ,
620+ }
621+ }
622+ } ) )
623+ } else {
624+ base_stream
625+ } ;
626+
627+ filtered_stream
628+ }
0 commit comments