From 7265876c4acfc182454ae842fd0eadb50894f5cd Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Tue, 22 Oct 2013 11:18:34 -0400 Subject: [PATCH] Fixing issue #48 issue #49 issue #50 issue #51 --- doc/api/nodejs/nodejs.md | 28 +++++++++----- rx.node.js | 83 +++++++++++----------------------------- 2 files changed, 41 insertions(+), 70 deletions(-) diff --git a/doc/api/nodejs/nodejs.md b/doc/api/nodejs/nodejs.md index 12bebd51a..dd1a62ae4 100644 --- a/doc/api/nodejs/nodejs.md +++ b/doc/api/nodejs/nodejs.md @@ -26,7 +26,9 @@ The Reactive Extensions for JavaScript provides integration points to the core N ### Callback Handlers ### ### `Rx.Node.fromCallback(func, [scheduler], [context])` -# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L23-L41 "View in source") +# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L27-L29 "View in source") + +**Deprecated in favor of `Rx.Observable.fromCallback` in rx.async.js.** Converts a callback function to an observable sequence. @@ -43,7 +45,11 @@ Converts a callback function to an observable sequence. var fs = require('fs'); var Rx = require('Rx'); -var source = Rx.Node.fromCallback(fs.exists)('/etc/passwd'); +// Wrap exists +var exists = Rx.Node.fromCallback(fs.exists); + +// Call exists +var source = exists('/etc/passwd'); var observer = Rx.Observer.create( function (x) { @@ -70,7 +76,9 @@ var subscription = source.subscribe(observer); * * * ### `Rx.Node.fromNodeCallback(func, [scheduler], [context])` -# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L50-L75 "View in source") +# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L41-L43 "View in source") + +**Deprecated in favor of `Rx.Observable.fromNodeCallback` in rx.async.js.** Converts a Node.js callback style function to an observable sequence. This must be in function (err, ...) format. @@ -116,10 +124,12 @@ var subscription = source.subscribe(observer); ### Event Handlers ### -### `Rx.Node.fromEventEmitter(eventEmitter, eventName)` -# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L83-L95 "View in source") +### `Rx.Node.fromEvent(eventEmitter, eventName)` +# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L54-L56 "View in source") + +**Deprecated in favor of `Rx.Observable.fromEvent` in rx.async.js.** -Handles an event from the given EventEmitter as an observable sequence. +Handles an event from the given EventEmitter as an observable sequence. #### Arguments 1. `eventEmitter` *(EventEmitter)*: The EventEmitter to subscribe to the given event. @@ -163,7 +173,7 @@ emitter.emit('data', 'foo'); * * * ### `Rx.Node.toEventEmitter(observable, eventName)` -# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L105-L122 "View in source") +# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L66-L84 "View in source") Converts the given observable sequence to an event emitter with the given event name. The errors are handled on the 'error' event and completion on the 'end' event. @@ -207,7 +217,7 @@ emitter.publish(); ### Stream Handlers ### ### `Rx.Node.fromStream(stream)` -# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L129-L153 "View in source") +# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L91-L115 "View in source") Converts a flowing stream to an Observable sequence. @@ -235,7 +245,7 @@ var subscription = Rx.Node.fromStream(process.stdin) * * * ### `Rx.Node.writeToStream(observable, stream, [encoding])` -# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L162-L175 "View in source") +# [Ⓢ](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L124-L138 "View in source") Writes an observable sequence to a stream. diff --git a/rx.node.js b/rx.node.js index 6722323d8..d8dccf438 100644 --- a/rx.node.js +++ b/rx.node.js @@ -1,19 +1,22 @@ var Rx = require('./rx'); require('./rx.aggregates'); +require('./rx.async'); require('./rx.binding'); require('./rx.coincidence'); require('./rx.experimental'); require('./rx.joinpatterns'); -require('./rx.testing'); require('./rx.time'); require('./rx.virtualtime'); +require('./rx.testing'); // Add specific Node functions var EventEmitter = require('events').EventEmitter, - slice = Array.prototype.slice; + Observable = Rx.Observable; Rx.Node = { /** + * @deprecated Use Rx.Observable.fromCallback from rx.async.js instead. + * * Converts a callback function to an observable sequence. * * @param {Function} function Function to convert to an asynchronous function. @@ -22,77 +25,34 @@ Rx.Node = { * @returns {Function} Asynchronous function. */ fromCallback: function (func, scheduler, context) { - scheduler || (scheduler = Rx.Scheduler.timeout); - return function () { - var args = slice.call(arguments, 0), - subject = new Rx.AsyncSubject(); - - scheduler.schedule(function () { - function handler() { - subject.onNext(arguments); - subject.onCompleted(); - } - - args.push(handler); - func.apply(context, args); - }); - - return subject.asObservable(); - }; + return Observable.fromCallback(func, scheduler, context); }, /** + * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead. + * * Converts a Node.js callback style function to an observable sequence. This must be in function (err, ...) format. + * * @param {Function} func The function to call * @param {Scheduler} [scheduler] Scheduler to run the function on. If not specified, defaults to Scheduler.timeout. * @param {Mixed} [context] The context for the func parameter to be executed. If not specified, defaults to undefined. * @returns {Function} An async function which when applied, returns an observable sequence with the callback arguments as an array. */ fromNodeCallback: function (func, scheduler, context) { - scheduler || (scheduler = Rx.Scheduler.timeout); - return function () { - var args = slice.call(arguments, 0), - subject = new Rx.AsyncSubject(); - - scheduler.schedule(function () { - function handler(err) { - var handlerArgs = slice.call(arguments, 1); - - if (err) { - subject.onError(err); - return; - } - - subject.onNext(handlerArgs); - subject.onCompleted(); - } - - args.push(handler); - func.apply(context, args); - }); - - return subject.asObservable(); - }; + return Observable.fromNodeCallback(func, scheduler, context); }, /** + * @deprecated Use Rx.Observable.fromEvent from rx.async.js instead. + * * Handles an event from the given EventEmitter as an observable sequence. + * * @param {EventEmitter} eventEmiiter The EventEmitter to subscribe to the given event. * @param {String} eventName The event name to subscribe * @returns {Observable} An observable sequence generated from the named event from the given EventEmitter. */ fromEvent: function (eventEmitter, eventName) { - return Rx.Observable.create(function (observer) { - function handler () { - observer.onNext(arguments); - } - - eventEmitter.on(eventName, handler); - - return function () { - eventEmitter.off(eventName, handler); - } - }).publish().refCount(); + return Observable.fromEvent(eventEmitter, eventName); }, /** @@ -106,6 +66,7 @@ Rx.Node = { toEventEmitter: function (observable, eventName) { var e = new EventEmitter(); + // Used to publish the events from the observable e.publish = function () { e.subscription = observable.subscribe( function (x) { @@ -128,7 +89,7 @@ Rx.Node = { * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events. */ fromStream: function (stream) { - return Rx.Observable.create(function (observer) { + return Observable.create(function (observer) { function dataHandler (data) { observer.onNext(data); } @@ -141,14 +102,14 @@ Rx.Node = { observer.onCompleted(); } - stream.on('data', dataHandler); - stream.on('error', errorHandler); - stream.on('end', endHandler); + stream.addListener('data', dataHandler); + stream.addListener('error', errorHandler); + stream.addListener('end', endHandler); return function () { - stream.off('data', dataHandler); - stream.off('error', errorHandler); - stream.off('end', endHandler); + stream.removeListener('data', dataHandler); + stream.removeListener('error', errorHandler); + stream.removeListener('end', endHandler); }; }).publish().refCount(); },