'use strict' var cluster = require('cluster'); var spawn = require('child_process').spawn; var stream = require('stream'); var glob = require('glob'); var files = glob.sync('/data/logs/*.log.gz'); function Parser() { var t = new stream.Transform( { objectMode: true } ); var transform = t._transform; var flush = t._flush; var parse = t.parse; return t; } function transform(chunk, enc, done) { var data = chunk.toString(); //if flush gives us remaining data from last line, prepend to next chunk if (this._lastLineData) data = this._lastLineData + data; var lines = data.split('\n'); this._lastLineData = lines[lines.length-1]; for (var i = 0; i < lines.length; i++) { this.push(parse.lines[i]); } done(); } function flush(done) { //If remaining data, pass to transform to prepend to next chunk if (this._lastLineData) this.push(this._lastLineData); this._lastLineData = null done() } function parse(line) { //Split line into fields, or apply regex to filter results return line; } if (cluster.isMaster) { var workers = 24; // number of CPUs var logsPerWorker = workers / files; var remainder = workers % files; var logSlice {}; var sliceStart = 0; var sliceEnd = 0; for (var i = 0; i < workers; i++) { if (remainder) { sliceEnd++; remainder--; } logSlice[i] = files.slice(sliceStart,sliceEnd); sliceStart++; sliceEnd = sliceStart; if (logsPerWorker === 0 && remainder === 0) { //if no more logs to distribute, break out of loop break; } } for (var i = 0; i < workers; i++) { //Fork and pass list of logs for child to process cluster.fork({logs: logSlice[i]}); } } else { var liner = Parser(); var logs = split.env.logs(','); //read logs passed by parent for (var i = 0; i < logs.length; i++){ var child = spawn('zcat', [ logs[i] ]); child.stdout.pipe(liner); //pipe this to stdout or to writeStream to write to a file } }