Creates promise-based transform streams.
.map()
creates a stream that converts objects to promises.
.resolve()
resolves all these promises.
The main use-case for this is to create "concurrent" transform that preserve order. This module does not apply any concurrent limits! However, you could play with the buffer sizes, which indirectly relate to concurrency in this context.
Suppose you have MongoDB documents that look like this:
var post = {
creator_id: ObjectId('123412341234123412341234'),
title: 'some title',
markdown: 'blah'
}
You want to grab all the .creator
s, but you still want to stream all the results.
Here's how to do it:
var PStream = require('promise-transform-stream')
db.posts.find({}).stream()
// get the creator of each post, returns a stream of promises
.pipe(PStream.map(getCreator))
// resolves each promise in the stream in order
.pipe(PStream.resolve())
// no we can serialize the results and send it to the client!
.pipe(JSONStream.stringify())
.pipe(zlib.createGzip())
.pipe(res)
function getCreator(post) {
return new Promise(function (resolve, reject) {
db.users.findOne({
_id: post.creator_id
}, function (err, user) {
if (err) return reject(err)
post.creator = user
resolve(post)
})
})
}
To be more efficient, you'd probably want to query each creator_id
once per request.
Create a new Transform
constructor with ._transform
and ._flush
methods.
transform
should resolve to the mapped document.
flush
should not have any arguments.
var Transform = PStream.create(function (doc) {
return Promise.resolve(doc)
})
db.posts.find({}).stream()
.pipe(Transform())
The same as .create()
, except it returns the transform instance immediately,
through2-style.
This is not recommended!
Returns a resolving transform stream.