//filter will reemit the data if cb(err,pass) pass is truthy // reduce is more tricky // maybe we want to group the reductions or emit progress updates occasionally // the most basic reduce just emits one 'data' event after it has recieved 'end' var Stream = require('stream').Stream //create an event stream and apply function to each .write //emitting each response as data //unless it's an empty callback module.exports = function (mapper) { var stream = new Stream() , inputs = 0 , outputs = 0 , ended = false , paused = false , destroyed = false stream.writable = true stream.readable = true stream.write = function () { if(ended) throw new Error('map stream is not writable') inputs ++ var args = [].slice.call(arguments) , r , inNext = false //pipe only allows one argument. so, do not function next (err) { if(destroyed) return inNext = true outputs ++ var args = [].slice.call(arguments) if(err) { args.unshift('error') return inNext = false, stream.emit.apply(stream, args) } args.shift() //drop err if (args.length) { args.unshift('data') r = stream.emit.apply(stream, args) } if(inputs == outputs) { if(paused) paused = false, stream.emit('drain') //written all the incoming events if(ended) end() } inNext = false } args.push(next) try { //catch sync errors and handle them like async errors var written = mapper.apply(null, args) paused = (written === false) return !paused } catch (err) { //if the callback has been called syncronously, and the error //has occured in an listener, throw it again. if(inNext) throw err next(err) return !paused } } function end (data) { //if end was called with args, write it, ended = true //write will emit 'end' if ended is true stream.writable = false if(data !== undefined) return stream.write(data) else if (inputs == outputs) //wait for processing stream.readable = false, stream.emit('end'), stream.destroy() } stream.end = function (data) { if(ended) return end() } stream.destroy = function () { ended = destroyed = true stream.writable = stream.readable = paused = false process.nextTick(function () { stream.emit('close') }) } stream.pause = function () { paused = true } stream.resume = function () { paused = false } return stream }