Skip to content

Releases: orxfun/orx-parallel

Recursive Parallel Iterators

02 Nov 20:31
a857613

Choose a tag to compare

Changes

Recursive Parallel Iterators

IntoParIterRec trait can be used to create a parallel recursive iterator over an initial set of elements which is useful when working with non-linear data structures such as trees and graphs.

Consider, for instance, a tree which is defined by the following node struct:

pub struct Node<T> {
    pub data: T,
    pub children: Vec<Node<T>>,
}

Assume that we want to map all the data with map: impl Fn(T) -> u64 and compute the sum of mapped values of all nodes descending from a root: &Node.

We can express this computation and execute in parallel with the following:

fn extend<'a>(node: &&'a Node, queue: &Queue<&'a Node>) {
    queue.extend(&node.children);
}

[root].into_par_rec(extend).map(map).sum()

Instead of into_par, we use into_par_rec and provide extend function as its argument. This function defines the recursive extension of the parallel iterator such that every time we process a node we first add its children to the queue. Queue is the queue of elements to be processed and it exposes two growth methods to define the recursive extension: push and extend.

Although we create the parallel iterator differently, we get a ParIter. Therefore, we have access to all features of a regular parallel iterator.

For instance, assume we want to filter nodes first. Further, instead of summing up the mapped values, we need to collect them in a vector. We can express this computation just as we would do on a linear data structure:

[root].into_par_rec(extend).filter(filter).map(map).collect()

For more details, you may see the parallelization_on_tree example.

Diagnostics

ParallelExecutorWithDiagnostics executor is created. Any parallel executor can be converted into one with diagnostics. This executor is meant to be used for testing parallel computations and understand the distribution of the workload to threads. During the parallel computation, it collects diagnostics about:

  • how many threads are used for the parallel computation
  • how many times each thread received a tasks
  • average chunk size; i.e., average number of tasks, that each thread received
  • and finally, explicit chunk sizes for the first task assignments.

These metrics are printed on the stdout once the parallel computation is completed. Therefore, it is not meant to be used for production.

Running a parallel computation with diagnostics is convenient.

let sum = range
    .par()
    .with_runner(DefaultRunner::default().with_diagnostics()) // this line enables diagnostics
    .map(|x| x + 1)
    .filter(|x| x.is_multiple_of(2))
    .sum();

Related Issues

Not exactly fixes but provides a solution to #104 with probably a different approach than intended. Please also see the related computation experiments.

edit after second iteration

Fixes #104

Thanks to @davidlattimore for suggestions and feedback on the api.

Enable thread pools & no-std

25 Sep 07:53
1b0bf95

Choose a tag to compare

Towards thread pools

This PR redefines the parallel runner as follows.

This crate defines parallel computation by combining two basic components.

Pulling inputs

  • Pulling inputs in parallel is achieved through ConcurrentIter. Concurrent iterator implementations are lock-free, efficient and support pull-by-chunks optimization to reduce the parallelization overhead. A thread can pull any number of inputs from the concurrent iterator every time it becomes idle. This provides the means to dynamically decide on the chunk sizes.
  • Furthermore, this allows to reduce the overhead of defining creating tasks. To illustrate, provided that the computation will be handled by n threads, a closure holding a reference to the input concurrent iterator is defined to represent the computation. This same closure is passed to n threads; i.e., n spawn calls are made. Each of these threads keep pulling elements from the input until the computation is completed, without requiring to define another task.

Writing outputs

  • When we collect results, writing outputs is handled using lock-free containers such as ConcurrentBag and ConcurrentOrderedBag which aim for high performance collection of results.

There are two main decisions to be taken while executing these components:

  • how many threads do we use?
  • what is the chunk size; i.e., how many input items does a thread pull each time?

A ParallelRunner is a combination of a ParThreadPool and a ParallelExecutor that are responsible for these decisions, respectively.

ParThreadPool: number of threads

ParThreadPool trait generalizes thread pools that can be used for parallel computations. This allows the parallel computation to be generic over thread pools.

When not explicitly set, DefaultPool is used:

  • When std feature is enabled, default pool is the StdDefaultPool. In other words, all available native threads can be used by the parallel computation. This number can be globally bounded by "ORX_PARALLEL_MAX_NUM_THREADS" environment variable when set.
  • When working in a no-std environment, default pool is the SequentialPool. As the name suggests, this pool executes the parallel computation sequentially on the main thread. It can be considered as a placeholder to be overwritten by with_pool or with_runner methods to achieve parallelism.

Note that thread pool defines the resource, or upper bound. This upper bound can further be bounded by the num_threads configuration. Finally, parallel executor might choose not to use all available threads if it decides that the computation is small enough.

To overwrite the defaults and explicitly set the thread pool to be used for the computation, with_pool or with_runner methods are used.

use orx_parallel::*;

let inputs: Vec<_> = (0..42).collect();

// uses the DefaultPool
// assuming "std" enabled, StdDefaultPool will be used; i.e., native threads
let sum = inputs.par().sum();

// equivalent to:
let sum2 = inputs.par().with_pool(StdDefaultPool::default()).sum();
assert_eq!(sum, sum2);

#[cfg(feature = "scoped_threadpool")]
{
    let mut pool = scoped_threadpool::Pool::new(8);
    // uses the scoped_threadpool::Pool created with 8 threads
    let sum2 = inputs.par().with_pool(&mut pool).sum();
    assert_eq!(sum, sum2);
}

#[cfg(feature = "rayon-core")]
{
    let pool = rayon_core::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();
    // uses the rayon-core::ThreadPool created with 8 threads
    let sum2 = inputs.par().with_pool(&pool).sum();
    assert_eq!(sum, sum2);
}

#[cfg(feature = "yastl")]
{
    let pool = YastlPool::new(8);
    // uses the yastl::Pool created with 8 threads
    let sum2 = inputs.par().with_pool(&pool).sum();
    assert_eq!(sum, sum2);
}

ParThreadPool implementations of several thread pools are provided in this crate as optional features (see features). Provided that the pool supports scoped computations, it is trivial to implement this trait in most cases (see implementations for examples).

In most of the cases, rayon-core, scoped_threadpool and scoped_pool perform better than others, and get close to native threads performance with StdDefaultPool.

Since parallel computations are generic over the thread pools, performances can be conveniently compared for specific use cases. Such an example benchmark can be found in collect_filter_map file. To have quick tests, you may also use the example benchmark_pools.

ParallelExecutor: chunk size

Once thread pool provides the computation resources, it is ParallelExecutor's task to distribute work to available threads. As mentioned above, all threads receive exactly the same closure. This closure continues to pull elements from the input concurrent iterator and operate on the inputs until all elements are processed.

The critical decision that parallel executor makes is the chunk size. Depending on the state of the computation, it can dynamically decide on number of elements to pull from the input iterator. The tradeoff it tries to solve is as follows:

  • the larger the chunk size,
    • the smaller the parallelization overhead; but also
    • the larger the risk of imbalance in cases of heterogeneity.

Features

With this PR, the crate is converted into a no-std crate.

  • std: This is a no-std crate while std is included as a default feature. Please use --no-default-features flag for no-std use cases. std feature enables StdDefaultPool as the default thread provider which uses native threads.
  • rayon-core: This feature enables using rayon_core::ThreadPool for parallel computations.
  • scoped_threadpool: This feature enables using scoped_threadpool::Pool.
  • scoped-pool: This feature enables using scoped-pool::Pool.
  • yastl: This feature enables using yastl::Pool.
  • pond: This feature enables using pond::Pool.
  • poolite: This feature enables using poolite::Pool.

Breaking Change

The changes on the ParallelRunner trait are breaking changes, if you have been using with_runner tranformation. However, prior to thread pool, this transformation was being used pretty much as an internal experimental and benchmarking tool. None of the tests, examples or benchmarks are broken.

Target Issues

This PR aims to address the pre-requisites

  • for issue #104 , and
  • for discussion #77

Fixes #82

Chain

08 Sep 07:25
966f5f6

Choose a tag to compare

Enables chaining parallel iterators.

inputs.into_par().chain(inputs).map(map).collect()

Currently, the first iterator of the chain is required to have an exact length, as demonstrated in the example below:

let a = vec!['a', 'b', 'c']; // with exact len
let b = vec!['d', 'e', 'f'].into_iter().filter(|x| *x != 'x'); // does not require to have known length
let chain = a.into_par().chain(b.iter_into_par());
assert_eq!(
    chain.collect::<Vec<_>>(),
    vec!['a', 'b', 'c', 'd', 'e', 'f'],
);

The requirement is due to the following:

  • While defining parallel iterators, the computations are lazily composed of smaller parts. We do not know for sure how the computation will conclude.
  • Therefore, we have to assume all possible cases, including ordered iteration. For instance, it is very likely that the computation will conclude with an ordered collection.
  • In order to achieve ordered results, at some point, we have to dynamically know the length of the first iterator.
  • This adds an additional layer of atomic book keeping that requires a very careful implementation for both safety and for achieving a performant solution.

Therefore, current implementation allows for chaining an iterator with a known length with other iterators. The benchmarks show that chained iterators result in similar performance improvements of not chained iterators.

Fallible iterators, take-map while

29 Aug 08:48
b9f6517

Choose a tag to compare

Changes

Fallible Parallel Iterators

We enjoy rust's ? operator when working with fallible computations. It allows us to focus on and code only the success path. Failure at any step of the computation leads to a short-circuit immediately returning from the function.

fn try_to_parse() -> Result<i32, std::num::ParseIntError> {
    let x: i32 = "123".parse()?; // x = 123
    let y: i32 = "24a".parse()?; // returns an Err() immediately
    Ok(x + y)                    // Doesn't run.
}

However, we do not have this convenience while working with iterators.

collect is the only exception. Normally, it allows us to pick the container to collect the items into.

let into_vec: Vec<usize> = (0..10).collect();
let into_set: std::collections::HashSet<usize> = (0..10).collect();

But it also does something exceptional when the item type is a result:

  • The first computation below is similar to above, it simply collects each element to the container which is defined as a vector.
  • The second computation; however, is fundamentally different. It collects elements iff all elements are of the Ok variant. Further, it short-circuits the computation as soon as an Err is observed. This is exactly how the ? operator works, for a series of computations rather than one.
let into_vec_of_results: Vec<Result<usize, char>> = (0..10).map(|x| Ok(x)).collect();
let into_result_of_vec: Result<Vec<usize>, char> = (0..10).map(|x| Ok(x)).collect();

Although convenient, change in the behavior of the collect computation might be considered unexpected, at least for me.

Further, we do have not short-circuiting methods for computations other than collect. For instance, it is not as convenient to compute the sum of numbers of an iterator provided that all elements are of the Ok variant, or to receive the error otherwise.

In general, the requirement to early exit in fallible computation is common and important both for performance and convenience.

For parallel computation, this crate proposes to explicitly transform an iterator with fallible elements into a fallible parallel iterator.

use orx_parallel::*;
use std::num::ParseIntError;

let collect: Result<Vec<i32>, ParseIntError> = vec!["7", "2", "34"]
    .into_par()
    .map(|x| x.parse::<i32>())
    .into_fallible_result() // <-- explicit transformation to fallible iterator
    .collect();

Currently, there exist two fallible parallel iterators ParIterResult and ParIterOption. The transformation is as follows:

Regular Iterator Transformation Method Fallible Iterator
ParIter<Item=Result<T, E>> into_fallible_result() ParIterResult<Item=T, Error=E>
ParIter<Item=Option<T>> into_fallible_option() ParIterOption<Item=T>

After converting into a fallible iterator, each chaining transformation is based on the success item type. Similar to ? operator, this allows us to focus on and implement the success path while any error case will be handled by early returning from the iterator with the error.

use orx_parallel::*;
use std::num::ParseIntError;

let sum: Result<i32, ParseIntError> = vec!["7", "2", "34"]
    .into_par()
    .map(|x| x.parse::<i32>()) // Item = Result<i32, ParseIntError>
    .into_fallible_result() // we are only working with success type after this point
    .map(|x| x + 1)
    .filter(|x| x % 2 == 0)
    .flat_map(|x| [x, x + 1, x + 2])
    .sum(); // returns Result, rather than i32
assert_eq!(sum, Ok(27));

let sum: Result<i32, ParseIntError> = vec!["7", "!!!", "34"]
    .into_par()
    .map(|x| x.parse::<i32>())
    .into_fallible_result()
    .map(|x| x + 1)
    .filter(|x| x % 2 == 0)
    .flat_map(|x| [x, x + 1, x + 2])
    .sum();
assert!(sum.is_err());

As demonstrated above, not only collect but all computation methods return a Result.

To summarize:

  • We can use all iterator methods with fallible iterators as well.
  • The transformations are based on the success type. All computations return a Result:
    • if all computations succeed, it is Ok of the value that an infallible iterator would return;
    • it is the first discovered Err if any of the computations fail.
  • Finally, all computations immediately return if any of the elements fail to compute.

Optional fallible iterator behaves exactly the same, except that None is treated as the failure case.

TakeWhile and MapWhile

Similar to fallible parallel iterators, take_while and map_while methods allow for short-circuiting computations. Unlike regular sequential iterators, the result is not always deterministic due to parallel execution. However, as demonstrated in the example below, collect with ordered execution makes sure that all elements before the predicate returns None are collected in the correct order; and hence, the result is deterministic.

use orx_parallel::*;

let iter = (0..10_000).par().take_while(|x| *x != 5_000); // takes until reaching 5000
let b: Vec<_> = iter.collect();
assert_eq!(b, (0..5_000).collect::<Vec<_>>());

let iter = (0..10_000)
    .par()
    .map(|x| x as i32 - 5_000) // -5_000..10_000
    .map_while(|x| 16i32.checked_div(x)); // maps until reaching 0, which fails
let b: Vec<_> = iter.collect();

assert_eq!(b, (-5_000..0).map(|x| 16 / x).collect::<Vec<_>>());

Simplification of Computational Variants

Prior, all computations defined by iterators had been represented by four computational variants: par (empty), map, xap and xap-filter-xap. With the revision, the first three variants suffice to represent computations, allowing us to avoid more complex xap-filter-xap computation. The revision has no impact on performance and benchmarks.

Following up on computational simplifications #83, Values abstraction is significantly simplified. This enables to implement new useful value types to model the while transformation.

Mutable Parallel Computation and Relaxed Trait Bounds

08 Aug 14:48
bdb4439

Choose a tag to compare

Changes

Send and Sync bounds revised and relaxed.

Fixes #68

Parallel Computation over Mutable References

  • Generally, any ConcurrentIter<Item = &mut T> can be converted into a ParIter<Item = &mut T>. Therefore, the following program is valid:
let mut vec = vec![1, 2, 3];
let slice = vec.as_mut_slice(); // &mut T: IntoConcurrentIter<Item = &mut T>
                                // => &mut T: IntoParIter<Item = &mut T>
let par = slice.into_par();
par.filter(|x| **x != 42).for_each(|x| *x *= 0);
  • ParallelizableCollectionMut trait is defined as a parallel counterpart of the CollectionMut trait. This enables the par_mut method on the data source, which can be used as follows:
let mut vec = vec![1, 2, 3]; // Vec<T>: ParallelizableCollectionMut
let par = slice.par_mut();
par.filter(|x| **x != 42).for_each(|x| *x *= 0);
  • Any arbitrary sequential Iterator can be converted into a parallel iterator using iter_into_par method. This holds true even if the iterator implements Iterator<Item = &mut T> for some T. Therefore, any iterator over mutable references can also be parallelized as follows:
let mut map: HashMap<_, _> = (0..N).map(|x| (x.to_string(), x)).collect();
let iter = map.values_mut(); // iter: impl Iterator<Item = &mut usize>

let par = iter.iter_into_par(); // par: impl ParIter<Item = &mut usize>
par.filter(|x| **x != 42).for_each(|x| *x *= 0);

let sum = map.values().iter_into_par().sum();
assert_eq!(sum, 42);

Fixes #76

Using Transformation and Global Thread Bound Config

05 Aug 05:21
05d9295

Choose a tag to compare

Using Transformation

Using transformation is introduced: ParIter => using U => ParIterUsing<U>.

Most of the regular Iterator methods can mutably capture a variable from the scope. In order to be able to do this, the iterator methods accept FnMut. Since the computation will be handled sequentially, the captured variable will be mutated sequentially without a race condition.

This is clearly not safe with a parallel iterator. Computations will be handled by multiple threads and multiple threads must not capture a mutable reference of the same variable. This makes certain programs not representable by ParIter.

However, ParIterUsing<U> allows for parallel computations with an additional mutable reference to U.

The following simple example demonstrates this for the common random number generator use case:

let input: Vec<u64> = (1..N).collect();

input
    .into_par() // ParIter
    .using(|thread_idx| ChaCha20Rng::seed_from_u64(thread_idx as u64)) // transform into ParIterUsing<ChaCha20Rng>
    .map(|rng, i| fibonacci(i) % 1000 + 1) // rng: &mut ChaCha20Rng is available
    .filter(|rng, i| rng.random_bool(0.4)) // with all parallel iterator methods
    .map(|rng, i| rng.random_range(0..i))
    .sum()

And channels are used in the following example:

let (sender, receiver) = channel();

(0..5)
    .into_par()
    .using_clone(sender) // each thread gets a clone of the sender
    .for_each(|s, x| s.send(x).unwrap()); // s: &mut Sender<usize> is available to iterator methods

let mut res: Vec<_> = receiver.iter().collect();

res.sort();

assert_eq!(&res[..], &[0, 1, 2, 3, 4])

Detailed documentation about the using transformation can be found here: https://github.com/orxfun/orx-parallel/blob/release-v2.4.0/docs/using.md.

Note that this is a safe and generalized approach to deal with the mutability limitations of parallel computations through explicit definition of the value that can be mutated by each thread.

Fixes #64

Fixed Chunk Runner Performance

Avoids lags in fixed chunk parallel runner.

Benchmarks do not show any worsening in results.

Fixes #61

Global Configuration on Number of Threads

Maximum number of threads that can be used by parallel computations can be bounded by the environment variable ORX_PARALLEL_MAX_NUM_THREADS.

Fixes #74

Relax Sync constraint on slices

04 Aug 07:06
50bc7b7

Choose a tag to compare

Relaxes Send requirements on T from concurrent and parallel iterators which yield elements of type &T. Notice that &T auto implements Send provided that T implements Sync. Therefore, T: Send requirement is unnecessary and irrelevant.

This version brings in fixes in the downstream libraries which together fixes the problem in parallelization.

As an example case of the fix, the following function did not compile in the prior version due to Send requirement. It compiles in this version, as it should.

fn fun<T: Sync>(slice: &[T]) {
    let _iter = slice.par();
}

Parallel Draining Iterator

13 Jun 21:08
4423c6a

Choose a tag to compare

Parallel Draining Iterator

  • ParallelDrainableOverSlice trait is defined with par_drain(&mut self, range) method. This method returns a parallel draining iterator which yields owned elements which are removed from the original collection. It can be considered as the parallel counterpart of Vec::drain(&mut self, range).
  • Any type implementing ConcurrentDrainableOverSlice automatically implements ParallelDrainableOverSlice. In other words, if the type can create a concurrent draining iterator, the computation can be parallelized. In the current referenced version of orx-concurrent-iter, Vec implements ConcurrentDrainableOverSlice; and hence, automatically implements ParallelDrainableOverSlice.
  • Benchmarks on parallel draining iterators are implemented and reported in the documentation. Results show that the parallel draining iterator created from standard vector is highly efficient.

Direct Parallelization of VecDeque, SplitVec and FixedVec

20 May 08:38
adb876c

Choose a tag to compare

Upgrades the concurrent iterator dependency which will enable parallelization of the following additional collections:

  • alloc::collections::VecDeque
  • orx_split_vec::SplitVec
  • orx_fixed_vec::FixedVec

v2: Redesign parallelization traits

04 May 14:33
0fd9607

Choose a tag to compare

Parallel computation through iterators is re-designed.

  • Updates in the downstream crates are integrated, specifically the improvements in ConcurrentIter crate, v2.0, are benefited.
  • Functional compositions are re-designed. This led to improvements in performance, as well as, simplification of the code and implementations.
    • In order to achieve this, required abstractions over values and arrays (in the mathematical sense) is defined. This allows us to perform on a value or a collection of values using the same generic code, while the actual implementation is monomorphized and optimized under the hood.
    • With this design, we are required to have only four generic computational variants of the iterators without loss of performance:
      • empty -> this is equivalent to the input concurrent iterator source, no computation is defined over it yet.
      • map -> this is a map operation on the input in the classical sense
      • xap -> this on the other hand is a map with superpowers, it is a generalization of (i) one-to-one map, (ii) filter-map and (iii) flat-map operations.
      • xap-filter-xap -> finally, this is a sequence of xap, followed by a filter, followed by another xap.
    • These four variants are sufficient to represent all possible iterator compositions without loss of performance due to abstraction.
  • Significant performance improvements achieved and reported.
  • The concepts defined in orx_iterable are brought to this crate defining traits such as IntoParIter, Parallelizable and ParallelizableCollection.
  • Extensibility of input collections through concurrent iterators is defined.
  • ParallelRunner is converted into a trait. This allows for customization of the parallel executor. Therefore, it enables improvements in performance in parallel workstreams. Meanwhile, a performant default parallel executor is provided.
  • The complexity of collecting ordered or in-arbitrary-order is resolved using the CollectOrder parameter.
  • The api is extended by enabling new methods such as inspect.
  • Contribution guideline is revised.
  • GenericIterator defined mainly for experimentation purposes. GenericIterator is a generalization of sequential iterators, rayon's parallel iterators and orx-parallel's parallel iterators. The computation can be defined as a composition of iterator transformations in the same way and can be executed with either of the variants.
  • ci.yml is added.
  • Fixes #16
  • Fixes #26