Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Commit

Permalink
Fixing issue #48 issue #49 issue #50 issue #51
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Oct 22, 2013
1 parent d942b30 commit 7265876
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 70 deletions.
28 changes: 19 additions & 9 deletions doc/api/nodejs/nodejs.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ The Reactive Extensions for JavaScript provides integration points to the core N
### Callback Handlers ###

### <a id="rxnodefromcallbackfunc-scheduler-context"></a>`Rx.Node.fromCallback(func, [scheduler], [context])`
<a href="#rxnodefromcallbackfunc-scheduler-context">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L23-L41 "View in source")
<a href="#rxnodefromcallbackfunc-scheduler-context">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L27-L29 "View in source")

**Deprecated in favor of `Rx.Observable.fromCallback` in rx.async.js.**

Converts a callback function to an observable sequence.

Expand All @@ -43,7 +45,11 @@ Converts a callback function to an observable sequence.
var fs = require('fs');
var Rx = require('Rx');

var source = Rx.Node.fromCallback(fs.exists)('/etc/passwd');
// Wrap exists
var exists = Rx.Node.fromCallback(fs.exists);

// Call exists
var source = exists('/etc/passwd');

var observer = Rx.Observer.create(
function (x) {
Expand All @@ -70,7 +76,9 @@ var subscription = source.subscribe(observer);
* * *

### <a id="rxnodefromnodecallbackfunc-scheduler-context"></a>`Rx.Node.fromNodeCallback(func, [scheduler], [context])`
<a href="#rxnodefromcallbackfunc-scheduler-context">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L50-L75 "View in source")
<a href="#rxnodefromcallbackfunc-scheduler-context">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L41-L43 "View in source")

**Deprecated in favor of `Rx.Observable.fromNodeCallback` in rx.async.js.**

Converts a Node.js callback style function to an observable sequence. This must be in function (err, ...) format.

Expand Down Expand Up @@ -116,10 +124,12 @@ var subscription = source.subscribe(observer);

### Event Handlers ###

### <a id="rxnodefromeventeventemitter-eventname"></a>`Rx.Node.fromEventEmitter(eventEmitter, eventName)`
<a href="#rxnodefromeventeventemitter-eventname">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L83-L95 "View in source")
### <a id="rxnodefromeventeventemitter-eventname"></a>`Rx.Node.fromEvent(eventEmitter, eventName)`
<a href="#rxnodefromeventeventemitter-eventname">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L54-L56 "View in source")

**Deprecated in favor of `Rx.Observable.fromEvent` in rx.async.js.**

Handles an event from the given EventEmitter as an observable sequence.
Handles an event from the given EventEmitter as an observable sequence.

#### Arguments
1. `eventEmitter` *(EventEmitter)*: The EventEmitter to subscribe to the given event.
Expand Down Expand Up @@ -163,7 +173,7 @@ emitter.emit('data', 'foo');
* * *

### <a id="rxnodetoeventemitterobservable-eventname"></a>`Rx.Node.toEventEmitter(observable, eventName)`
<a href="#rxnodetoeventemitterobservable-eventname">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L105-L122 "View in source")
<a href="#rxnodetoeventemitterobservable-eventname">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L66-L84 "View in source")

Converts the given observable sequence to an event emitter with the given event name.
The errors are handled on the 'error' event and completion on the 'end' event.
Expand Down Expand Up @@ -207,7 +217,7 @@ emitter.publish();
### Stream Handlers ###

### <a id="rxnodefromstreamstream"></a>`Rx.Node.fromStream(stream)`
<a href="#rxnodefromstreamstream">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L129-L153 "View in source")
<a href="#rxnodefromstreamstream">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L91-L115 "View in source")

Converts a flowing stream to an Observable sequence.

Expand Down Expand Up @@ -235,7 +245,7 @@ var subscription = Rx.Node.fromStream(process.stdin)
* * *

### <a id="rxnodewritetostreamobservable-stream-encoding"></a>`Rx.Node.writeToStream(observable, stream, [encoding])`
<a href="#rxnodewritetostreamobservable-stream-encoding">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L162-L175 "View in source")
<a href="#rxnodewritetostreamobservable-stream-encoding">#</a> [&#x24C8;](https://github.com/Reactive-Extensions/RxJS/blob/master/rx.node.js#L124-L138 "View in source")

Writes an observable sequence to a stream.

Expand Down
83 changes: 22 additions & 61 deletions rx.node.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
var Rx = require('./rx');
require('./rx.aggregates');
require('./rx.async');
require('./rx.binding');
require('./rx.coincidence');
require('./rx.experimental');
require('./rx.joinpatterns');
require('./rx.testing');
require('./rx.time');
require('./rx.virtualtime');
require('./rx.testing');

// Add specific Node functions
var EventEmitter = require('events').EventEmitter,
slice = Array.prototype.slice;
Observable = Rx.Observable;

Rx.Node = {
/**
* @deprecated Use Rx.Observable.fromCallback from rx.async.js instead.
*
* Converts a callback function to an observable sequence.
*
* @param {Function} function Function to convert to an asynchronous function.
Expand All @@ -22,77 +25,34 @@ Rx.Node = {
* @returns {Function} Asynchronous function.
*/
fromCallback: function (func, scheduler, context) {
scheduler || (scheduler = Rx.Scheduler.timeout);
return function () {
var args = slice.call(arguments, 0),
subject = new Rx.AsyncSubject();

scheduler.schedule(function () {
function handler() {
subject.onNext(arguments);
subject.onCompleted();
}

args.push(handler);
func.apply(context, args);
});

return subject.asObservable();
};
return Observable.fromCallback(func, scheduler, context);
},

/**
* @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead.
*
* Converts a Node.js callback style function to an observable sequence. This must be in function (err, ...) format.
*
* @param {Function} func The function to call
* @param {Scheduler} [scheduler] Scheduler to run the function on. If not specified, defaults to Scheduler.timeout.
* @param {Mixed} [context] The context for the func parameter to be executed. If not specified, defaults to undefined.
* @returns {Function} An async function which when applied, returns an observable sequence with the callback arguments as an array.
*/
fromNodeCallback: function (func, scheduler, context) {
scheduler || (scheduler = Rx.Scheduler.timeout);
return function () {
var args = slice.call(arguments, 0),
subject = new Rx.AsyncSubject();

scheduler.schedule(function () {
function handler(err) {
var handlerArgs = slice.call(arguments, 1);

if (err) {
subject.onError(err);
return;
}

subject.onNext(handlerArgs);
subject.onCompleted();
}

args.push(handler);
func.apply(context, args);
});

return subject.asObservable();
};
return Observable.fromNodeCallback(func, scheduler, context);
},

/**
* @deprecated Use Rx.Observable.fromEvent from rx.async.js instead.
*
* Handles an event from the given EventEmitter as an observable sequence.
*
* @param {EventEmitter} eventEmiiter The EventEmitter to subscribe to the given event.
* @param {String} eventName The event name to subscribe
* @returns {Observable} An observable sequence generated from the named event from the given EventEmitter.
*/
fromEvent: function (eventEmitter, eventName) {
return Rx.Observable.create(function (observer) {
function handler () {
observer.onNext(arguments);
}

eventEmitter.on(eventName, handler);

return function () {
eventEmitter.off(eventName, handler);
}
}).publish().refCount();
return Observable.fromEvent(eventEmitter, eventName);
},

/**
Expand All @@ -106,6 +66,7 @@ Rx.Node = {
toEventEmitter: function (observable, eventName) {
var e = new EventEmitter();

// Used to publish the events from the observable
e.publish = function () {
e.subscription = observable.subscribe(
function (x) {
Expand All @@ -128,7 +89,7 @@ Rx.Node = {
* @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events.
*/
fromStream: function (stream) {
return Rx.Observable.create(function (observer) {
return Observable.create(function (observer) {
function dataHandler (data) {
observer.onNext(data);
}
Expand All @@ -141,14 +102,14 @@ Rx.Node = {
observer.onCompleted();
}

stream.on('data', dataHandler);
stream.on('error', errorHandler);
stream.on('end', endHandler);
stream.addListener('data', dataHandler);
stream.addListener('error', errorHandler);
stream.addListener('end', endHandler);

return function () {
stream.off('data', dataHandler);
stream.off('error', errorHandler);
stream.off('end', endHandler);
stream.removeListener('data', dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener('end', endHandler);
};
}).publish().refCount();
},
Expand Down

0 comments on commit 7265876

Please sign in to comment.