Skip to content

Commit

Permalink
ReactorUtils, refinements.
Browse files Browse the repository at this point in the history
  • Loading branch information
marco-brandizi committed Jul 2, 2024
1 parent 051eb80 commit a914eb8
Showing 1 changed file with 45 additions and 25 deletions.
70 changes: 45 additions & 25 deletions src/main/java/uk/ac/ebi/utils/opt/runcontrol/ReactorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,6 @@
*/
public class ReactorUtils
{
/**
* {@link Schedulers#newBoundedElastic(int, int, String)} with the {@link Schedulers#DEFAULT_BOUNDED_ELASTIC_SIZE default threadCap}
* and a low limit for queuedTaskCap. This is suitable for cases where the source is
* much faster than the downstream processing and hence there is little point with queueing
* too much stuff.
*
*/
public static final Scheduler DEFAULT_FLUX_SCHEDULER = newBoundedElastic (
DEFAULT_BOUNDED_ELASTIC_SIZE, 100,
"jutils.batchSched"
);

/**
* This has been tested in tasks like saving data on a database.
*/
public static final int DEFAULT_BATCH_SIZE = 2500;

/**
* Little helper to build a common {@link ParallelFlux} to process a source of items
* in parallel batches.
Expand All @@ -50,6 +33,24 @@ public class ReactorUtils
*/
public static class ParallelBatchFluxBuilder<T, B extends Collection<T>>
{
/**
* {@link Schedulers#newBoundedElastic(int, int, String)} with the {@link Schedulers#DEFAULT_BOUNDED_ELASTIC_SIZE default threadCap}
* and a low limit for queuedTaskCap. This is suitable for cases where the source is
* much faster than the downstream processing and hence there is little point with queueing
* too much stuff.
*
*/
public static final Scheduler DEFAULT_FLUX_SCHEDULER = newBoundedElastic (
DEFAULT_BOUNDED_ELASTIC_SIZE, 100,
"jutils.batchSched"
);

/**
* This has been tested in tasks like saving data on a database.
*/
public static final int DEFAULT_BATCH_SIZE = 2500;


private Flux<T> flux;
private Scheduler scheduler = DEFAULT_FLUX_SCHEDULER;
private int batchSize = DEFAULT_BATCH_SIZE;
Expand Down Expand Up @@ -83,7 +84,7 @@ public ParallelFlux<B> build ()
}

/**
* Default is {@link ReactorUtils#DEFAULT_FLUX_SCHEDULER}.
* Default is {@link #DEFAULT_FLUX_SCHEDULER}.
*/
public ParallelBatchFluxBuilder<T, B> withScheduler ( Scheduler scheduler )
{
Expand All @@ -92,7 +93,7 @@ public ParallelBatchFluxBuilder<T, B> withScheduler ( Scheduler scheduler )
}

/**
* Default it {@link ReactorUtils#DEFAULT_BATCH_SIZE}.
* Default it {@link #DEFAULT_BATCH_SIZE}.
*/
public ParallelBatchFluxBuilder<T, B> withBatchSize ( int batchSize )
{
Expand All @@ -112,6 +113,28 @@ public ParallelBatchFluxBuilder<T, B> withBatchSupplier ( Supplier<? extends Col
} // class ParallelBatchFluxBuilder


/**
* Just uses {@link ParallelBatchFluxBuilder} with its defaults.
*/
public static <T> ParallelFlux<List<T>> parallelBatchFlux ( Flux<? extends T> flux ) {
return new ParallelBatchFluxBuilder<T, List<T>> ( flux ).build ();
}

/**
* Just uses {@link ParallelBatchFluxBuilder} with its defaults.
*/
public static <T> ParallelFlux<List<T>> parallelBatchFlux ( Stream<? extends T> stream ) {
return new ParallelBatchFluxBuilder<T, List<T>> ( stream ).build ();
}

/**
* Just uses {@link ParallelBatchFluxBuilder} with its defaults.
*/
public static <T> ParallelFlux<List<T>> parallelBatchFlux ( Collection<? extends T> collection ) {
return new ParallelBatchFluxBuilder<T, List<T>> ( collection ).build ();
}


/**
* Uses {@link ParallelBatchFluxBuilder} to process a source of batches.
*/
Expand All @@ -134,8 +157,7 @@ public static <T> void batchProcessing (
Flux<T> flux, Consumer<List<T>> task
)
{
ParallelFlux<List<T>> parFlux = new ParallelBatchFluxBuilder<T,List<T>> ( flux ).build ();
batchProcessing ( parFlux, task );
batchProcessing ( parallelBatchFlux ( flux ), task );
}

/**
Expand All @@ -145,8 +167,7 @@ public static <T> void batchProcessing (
Stream<T> stream, Consumer<List<T>> task
)
{
ParallelFlux<List<T>> parFlux = new ParallelBatchFluxBuilder<T, List<T>> ( stream ).build ();
batchProcessing ( parFlux, task );
batchProcessing ( parallelBatchFlux ( stream ), task );
}

/**
Expand All @@ -156,8 +177,7 @@ public static <T> void batchProcessing (
Collection<T> collection, Consumer<List<T>> task
)
{
ParallelFlux<List<T>> parFlux = new ParallelBatchFluxBuilder<T,List<T>> ( collection ).build ();
batchProcessing ( parFlux, task );
batchProcessing ( parallelBatchFlux ( collection ), task );
}

}

0 comments on commit a914eb8

Please sign in to comment.