diff --git a/RxSwift/Observables/Reduce.swift b/RxSwift/Observables/Reduce.swift index d5fab3478..daa6d923d 100644 --- a/RxSwift/Observables/Reduce.swift +++ b/RxSwift/Observables/Reduce.swift @@ -8,6 +8,40 @@ extension ObservableType { + + /** + Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value. + + For aggregation behavior with incremental intermediate results, see `scan`. + + - seealso: [reduce operator on reactivex.io](http://reactivex.io/documentation/operators/reduce.html) + + - parameter seed: The initial accumulator value. + - parameter accumulator: A accumulator function to be invoked on each element. + - parameter mapResult: A function to transform the final accumulator value into the result value. + - returns: An observable sequence containing a single element with the final accumulator value. + */ + public func reduce(into seed: A, accumulator: @escaping (inout A, Element) throws -> Void, mapResult: @escaping (A) throws -> Result) + -> Observable { + Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: mapResult) + } + + /** + Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value. + + For aggregation behavior with incremental intermediate results, see `scan`. + + - seealso: [reduce operator on reactivex.io](http://reactivex.io/documentation/operators/reduce.html) + + - parameter seed: The initial accumulator value. + - parameter accumulator: A accumulator function to be invoked on each element. + - returns: An observable sequence containing a single element with the final accumulator value. + */ + public func reduce(into seed: A, accumulator: @escaping (inout A, Element) throws -> Void) + -> Observable { + Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: { $0 }) + } + /** Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value. @@ -22,7 +56,15 @@ extension ObservableType { */ public func reduce(_ seed: A, accumulator: @escaping (A, Element) throws -> A, mapResult: @escaping (A) throws -> Result) -> Observable { - Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: mapResult) + Reduce( + source: self.asObservable(), + seed: seed, + accumulator: { acc, element in + let currentAcc = acc + acc = try accumulator(currentAcc, element) + }, + mapResult: mapResult + ) } /** @@ -38,7 +80,15 @@ extension ObservableType { */ public func reduce(_ seed: A, accumulator: @escaping (A, Element) throws -> A) -> Observable { - Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: { $0 }) + Reduce( + source: self.asObservable(), + seed: seed, + accumulator: { acc, element in + let currenctAcc = acc + acc = try accumulator(currenctAcc, element) + }, + mapResult: { $0 } + ) } } @@ -60,7 +110,7 @@ final private class ReduceSink: Producer { - typealias AccumulatorType = (AccumulateType, SourceType) throws -> AccumulateType + typealias AccumulatorType = (inout AccumulateType, SourceType) throws -> Void typealias ResultSelectorType = (AccumulateType) throws -> ResultType private let source: Observable diff --git a/Tests/RxSwiftTests/Observable+ReduceTests.swift b/Tests/RxSwiftTests/Observable+ReduceTests.swift index 318dd7fde..c76b63577 100644 --- a/Tests/RxSwiftTests/Observable+ReduceTests.swift +++ b/Tests/RxSwiftTests/Observable+ReduceTests.swift @@ -22,7 +22,6 @@ extension ObservableReduceTest { .completed(250) ]) - let res = scheduler.start { xs.reduce(42, accumulator: +) } let correctMessages = Recorded.events( @@ -47,7 +46,7 @@ extension ObservableReduceTest { .completed(250) ]) - let res = scheduler.start { xs.reduce(42, accumulator: +) } + let res = scheduler.start { xs.reduce(into: 42, accumulator: +=) } let correctMessages = Recorded.events( .next(250, 42 + 24), @@ -91,7 +90,7 @@ extension ObservableReduceTest { .next(150, 1), ]) - let res = scheduler.start { xs.reduce(42, accumulator: +) } + let res = scheduler.start { xs.reduce(into: 42, accumulator: +=) } let correctMessages: [Recorded>] = [ ] @@ -146,9 +145,9 @@ extension ObservableReduceTest { ]) let res = scheduler.start { - xs.reduce(42) { (a: Int, x: Int) throws -> Int in + xs.reduce(into: 42) { (a: inout Int, x: Int) throws -> Void in if x < 3 { - return a + x + a += x } else { throw testError @@ -200,7 +199,7 @@ extension ObservableReduceTest { .completed(250) ]) - let res = scheduler.start { xs.reduce(42, accumulator: +, mapResult: { $0 * 5 }) } + let res = scheduler.start { xs.reduce(into: 42, accumulator: +=, mapResult: { $0 * 5 }) } let correctMessages = Recorded.events( .next(250, (42 + 24) * 5), @@ -244,7 +243,7 @@ extension ObservableReduceTest { .next(150, 1), ]) - let res = scheduler.start { xs.reduce(42, accumulator: +, mapResult: { $0 * 5 }) } + let res = scheduler.start { xs.reduce(into: 42, accumulator: +=, mapResult: { $0 * 5 }) } let correctMessages: [Recorded>] = [ ] @@ -298,7 +297,7 @@ extension ObservableReduceTest { .completed(260) ]) - let res = scheduler.start { xs.reduce(42, accumulator: { a, x in if x < 3 { return a + x } else { throw testError } }, mapResult: { $0 * 5 }) } + let res = scheduler.start { xs.reduce(into: 42, accumulator: { a, x in if x < 3 { a += x } else { throw testError } }, mapResult: { $0 * 5 }) } let correctMessages = [ Recorded.error(240, testError, Int.self) @@ -345,7 +344,7 @@ extension ObservableReduceTest { } func testReduceReleasesResourcesOnError() { - _ = Observable.just(1).reduce(0, accumulator: +).subscribe() + _ = Observable.just(1).reduce(into: 0, accumulator: +=).subscribe() } #endif }