1 var EventEmitter = require('events').EventEmitter;
3 Lazy.prototype = new EventEmitter;
5 function Lazy (em, opts) {
6 if (!(this instanceof Lazy)) return new Lazy(em, opts);
9 if (!em._events) em._events = {};
10 self._events = em._events;
13 self.once = function (name, f) {
14 self.on(name, function g () {
15 self.removeListener(name, g);
16 f.apply(this, arguments);
21 var dataName = opts.data || 'data';
22 var pipeName = opts.pipe || 'pipe';
23 var endName = opts.pipe || 'end';
25 if (pipeName != endName) {
27 self.once(pipeName, function () { piped = true });
28 self.once(endName, function () {
29 if (!piped) self.emit(pipeName);
33 self.push = function (x) {
34 self.emit(dataName, x);
37 self.end = function () {
41 function newLazy (g, h) {
42 if (!g) g = function () { return true };
43 if (!h) h = function (x) { return x };
44 var lazy = new Lazy(null, opts);
45 self.on(dataName, function (x) {
46 if (g.call(lazy, x)) lazy.emit(dataName, h(x));
48 self.once(pipeName, function () {
54 self.filter = function (f) {
55 return newLazy(function (x) {
60 self.forEach = function (f) {
61 return newLazy(function (x) {
67 self.map = function (f) {
69 function () { return true },
70 function (x) { return f(x) }
74 self.head = function (f) {
76 lazy.on(dataName, function g (x) {
78 lazy.removeListener(dataName, g)
82 self.tail = function () {
84 return newLazy(function () {
93 self.skip = function (n) {
94 return newLazy(function () {
103 self.take = function (n) {
104 return newLazy(function () {
105 if (n == 0) self.emit(pipeName);
110 self.takeWhile = function (f) {
112 return newLazy(function (x) {
113 if (cond && f(x)) return true;
120 self.foldr = function (op, i, f) {
122 var lazy = newLazy();
123 lazy.on(dataName, function g (x) {
126 lazy.once(pipeName, function () {
131 self.sum = function (f) {
132 return self.foldr(function (x, acc) { return x + acc }, 0, f);
135 self.product = function (f) {
136 return self.foldr(function (x, acc) { return x*acc }, 1, f);
139 self.join = function (f) {
141 var lazy = newLazy(function (x) {
145 lazy.once(pipeName, function () { f(data) });
149 self.bucket = function (init, f) {
150 var lazy = new Lazy(null, opts);
151 var yield = function (x) {
152 lazy.emit(dataName, x);
157 self.on(dataName, function (x) {
158 acc = f.call(yield, acc, x);
161 self.once(pipeName, function () {
165 // flush on end event
166 self.once(endName, function () {
167 var finalBuffer = mergeBuffers(acc);
168 if(finalBuffer) yield(finalBuffer);
174 self.spilt = function(c){
175 return self.bucket([], function (chunkArray, chunk) {
176 var newline = c.charCodeAt(0), lastNewLineIndex = 0;
177 if (typeof chunk === 'string') chunk = new Buffer(chunk);
179 for (var i = 0; i < chunk.length; i++) {
180 if (chunk[i] === newline) {
181 // If we have content from the current chunk to append to our buffers, do it.
182 if(i>0) chunkArray.push(chunk.slice(lastNewLineIndex, i));
184 // Wrap all our buffers and emit it.
185 this(mergeBuffers(chunkArray));
186 lastNewLineIndex = i + 1;
190 if(lastNewLineIndex>0) {
191 // New line found in the chunk, push the remaining part of the buffer.
192 if(lastNewLineIndex < chunk.length) chunkArray.push(chunk.slice(lastNewLineIndex));
194 // No new line found, push the whole buffer.
195 if(chunk.length) chunkArray.push(chunk);
201 // Streams that use this should emit strings or buffers only
202 self.__defineGetter__('lines', function () {
203 return self.spilt("\n");
207 Lazy.range = function () {
208 var args = arguments;
210 var infinite = false;
212 if (args.length == 1 && typeof args[0] == 'number') {
213 var i = 0, j = args[0];
215 else if (args.length == 1 && typeof args[0] == 'string') { // 'start[,next]..[end]'
217 var startOpen = false, endClosed = false;
218 if (arg[0] == '(' || arg[0] == '[') {
219 if (arg[0] == '(') startOpen = true;
222 if (arg.slice(-1) == ']') endClosed = true;
224 var parts = arg.split('..');
225 if (parts.length != 2)
226 throw new Error("single argument range takes 'start..' or 'start..end' or 'start,next..end'");
228 if (parts[1] == '') { // 'start..'
232 else { // 'start[,next]..end'
233 var progression = parts[0].split(',');
234 if (progression.length == 1) { // start..end
235 var i = parts[0], j = parts[1];
237 else { // 'start,next..end'
238 var i = progression[0], j = parts[1];
239 step = Math.abs(progression[1]-i);
247 if (infinite || i < j) i++;
256 else if (args.length == 2 || args.length == 3) { // start, end[, step]
257 var i = args[0], j = args[1];
258 if (args.length == 3) {
263 throw new Error("range takes 1, 2 or 3 arguments");
266 var stopInfinite = false;
267 lazy.on('pipe', function () {
271 process.nextTick(function g () {
272 if (stopInfinite) return;
273 lazy.emit('data', i++);
278 process.nextTick(function () {
280 for (; i<j; i+=step) {
285 for (; i>j; i-=step) {
295 var mergeBuffers = function mergeBuffers(buffers) {
296 // We expect buffers to be a non-empty Array
297 if (!buffers || !Array.isArray(buffers) || !buffers.length) return;
299 var finalBufferLength, finalBuffer, currentBuffer, currentSize = 0;
301 // Sum all the buffers lengths
302 finalBufferLength = buffers.reduce(function(left, right) { return (left.length||left) + (right.length||right); }, 0);
303 finalBuffer = new Buffer(finalBufferLength);
304 while(buffers.length) {
305 currentBuffer = buffers.shift();
306 currentBuffer.copy(finalBuffer, currentSize);
307 currentSize += currentBuffer.length;