Clustering in Node.js (Part 1) – Round-robin load balancing in a node cluster

As we start to scale out our analytics service here, we’ve started thinking of ways to leverage node’s very good event-driven model across multiple cores and eventually multiple machines. Node clustering is still in its nascent stages, so there’s not a whole lot of functionality and it relies on OS-level forking to “scale out.” While this works in principle, it means that clustering in node is not native per se and that node’s cluster module essentially just provides a weak layer of abstraction across several node processes that are effectively launched from your command line.

Although node already implements load balancing for HTTP requests in its cluster module, it doesn’t handle load balancing for other protocols. Also, node’s load balancing algorithm had a few quirks related to load balancing by relying on the OS’s scheme of waking up dormant processes. (see here http://strongloop.com/strongblog/whats-new-in-node-js-v0-12-cluster-round-robin-load-balancing/). Hence, in Node.js v0.11.2, the developers at Joyent decided to move back to a round-robin scheme. Round-robin is not terribly sophisticated and make no judgment as to the type of work a worker is currently processing.  While it works well when each request is essentially equal in the amount of work required, it’s not so good when the incoming work to do varies in nature.  At least it’s conceptually simple and simple to implement.

My first major stumbling block when using node’s cluster module was the fact that this load balancing wasn’t supported for UDP.  Furthermore, we receive analytics data using a range of protocols (e.g. SFTP, TCP/IP, FTP, etc.) so I didn’t want to rely on node’s native libraries in case no one thought it important enough to extend load balancing to a variety of protocols. Thus, I had to come up with my own generalized architecture for implementing load balancing. Have a look:


var dgram = require('dgram')
	cluster = require('cluster'),
	WorkerPrototype = require('../workers/WorkerPrototype.js'),
	WorkerDownError = require('../exceptions/WorkerDownError.js'),
	net = require('net');

// Private Functions
var protocols = {
	'udp': {
		'start': listen_udp,
		'stop': shutdown_udp
	},
	'tcp_server': {
		'start': open_tcp_server,
		'stop': shutdown_tcp_server
	},
	'tcp_client': {
		'start': open_tcp_client,
		'stop': shutdown_tcp_client
	}
};

function listen(callback) {
	protocols[this.protocol].start.call(this, callback);
}

function shutdown(callback) {
	protocols[this.protocol].stop.call(this, callback);
}

//This function will restart the server every retry interval if restart is enabled
function restart(callback) {
	var self = this;
	console.log('[%s] Trying restart in %d ms ...', self.name, self.retrydelay);
	setTimeout(function() {
		self.retrydelay = Math.floor(self.retrydelay * 1.5);
		listen.call(self, callback);
	}, self.retrydelay );
}

//UDP Handling
function listen_udp(callback) {
	var self = this;
	var server = dgram.createSocket('udp4');
	server.on('error', function (err) {
        console.log('[%s] Error:\n' + err.stack, self.name);
        try {
      		server.close();
      	} catch (err) {
      		if (err) {
      			console.log('[%s] Error closing tcp connection: %s', self.name, err);
      		}
      	} finally {
      		if (self.restart) {
	        	restart.call(self, callback);
	       	} else {	//otherwise, just calls callback
	        	if (callback)
	        		callback(err);
	        }
      	}
    });

    server.on('message', function (msg, rinfo) {
		console.log('[%s] Received a message from ' + rinfo.address + ':' + rinfo.port + ' @ [%s] of size [%d] bytes.', self.name, new Date(), msg.length);
		self.counter++;
		self.distribute.call(self, self.workers, {
			data: msg, 
			rinfo: rinfo
		});
	});

	server.bind(self.port, function() {
	    console.log('[%s] Server opening socket and listening on port ' + self.port, self.name);
	});

	server.on('listening', function () {
	  	var address = server.address();
	  	var msg = '[' + self.name + ']  SUCCESS! UDP socket now listening @ ' +
	      	address.address + ':' + address.port;
	  	if (callback) {
	  		self.retrydelay = self.restartdelay;
	  		callback(null, msg);
	  	}
	});

	self.server = server;
	return server;
}

function shutdown_udp(callback) {
	var self = this;
	if (self.server) {
		self.server.close();
		var msg = '[' + self.name + '] Load Balancer shutdown successfully.';
		if (callback)
			callback(null, msg);
		
	} else {
		var err = '[' + self.name + '] Load Balancer not defined!  Could not be shutdown.';
		callback(err);
	}
}

//TCP Client - creates connection to another TCP server 
function open_tcp_server(callback) {
   //implementation goes here
}

function shutdown_tcp_server(callback) {
   //implementation goes here
}

function open_tcp_client(callback) {
   //implementation goes here
}

function shutdown_tcp_client(callback) {
   //implementation goes here
}

//Private Load Balancer
function instantiateWorkers() {
	for (var i = 0; i < this.instances; i++) {
		console.log('Instantiating Worker %s %d', this.worker_name, i);
		var worker = cluster.fork({
			name: this.worker_name,
			id: i
		});
	    this.workers.push(worker);
	}
}

/**
* Load Balancer is a class that wraps functionality for opening/closing a UDP or TCP connection
* It also implements default load balancing based on the number of workers specified.
*
* @class LoadBalancer
* @constructor
* @param {String} name The name for this load balancer
* @param {Number} port The port number
* @param {String} protocol Either udp or tcp at the moment, can be extended later on
* @param {String} worker_name Give each worker a name
* @param {Number} instances The number of instances of the LoadBalancer to be "forked" although, each instance should only receive messages
*/
var LoadBalancer = function(name, port, protocol, worker_name, instances, retryonerror) {

	if (!port || !protocol) {
		throw new Error('Could not instantiate load balancer.  Port/Protocol/Workers undefined.');
	}

	if (instances <= 0) {
		throw new Error('Not enough worker instances for workers.  Failed to start.');
	}

	if (!protocols[protocol]) {
		throw new Error('Could not instantiate load balancer.  Unknown protocol %s', protocol);
	}

	//Set by User
	this.name = name;
	this.port = port;
	this.protocol = protocol;
	this.worker_name = worker_name;
	this.instances = instances;
	this.restart = retryonerror ? retryonerror : false;	//if true, will try to reopen connection upon error
	this.restartdelay = 5000;	//will multiply by 1.5 each time and reset when connection is reestablished

	//Private members
	this.workers = [];
	this.server;
	this.counter = 0;
	this.retrydelay = this.restartdelay;

	var self=this;

	this.start = function(callback) {
		listen.call(self, function(err, msg) {
			self.counter = 0;	//reset counter
			if (!err)
				instantiateWorkers.call(self);

			if (callback) {
				callback(err, msg);
			}
		});
	}

	this.stop = function(callback) {
		shutdown.call(this, function(err, msg) {
			if (err) 
				console.log(err);
			if (callback)
				callback(err, msg);
		});
	}

	this.restart = function() {
		this.stop(function(err, msg) {
			if (err) {
				throw new Error(err);
			} else {
				this.start();
			}
		});
	}
};

/**
 * Default implementation of distribute - sequential cycling through workers
 *
 * @method distribute
 * @param {Array[cluster.Worker]} workers The array of workers managed by this LoadBalancer.  
 * @param {Object} data An object containing the data received on the port for this Load Balancer.  Ex:  UDP would send data {data: msg, rinfo: rinfo}
 */
LoadBalancer.prototype.distribute = function(workers, data) {
	var self = this;
	var worker = workers[self.counter % workers.length];
	try {
		if (worker) {
			worker.send(data);
		}
	} catch (e) {
		if (e instanceof WorkerDownError) {
			console.log('Worker %s is down.  Removing from workers for [%s:%d]', worker.name, self.name, self.instance);
			workers.splice(workers.indexOf(worker), 1);
		} else {
			throw e;
		}
	}
};

module.exports = LoadBalancer;

Basically, this prototype does a few things:

    1. Implements generalized servers by protocol to receive data

You can see that when one instantiates or extends the load balancer prototype, the user will need to specify the protocol to use. Since things like ports and instances of workers are common to all load balancers, these are private attributes of the load balancer instance. Essentially, all servers under this model work in the same way. A master server is instantiated which receives all data over a socket. When the server receives a request or message, it will pass it off to one of N workers (specified by the user) which is running in its own process. That worker then does the work.

    1. Implements default methods to start, stop and restart servers

A user doesn’t need to concern herself about the starting, stopping and restarting of servers should a socket go down. That’s why there is some dummy logic for reopening a socket upon error (e.g. if the socket is flooded and closes). The default mechanism is to try reopening the socket after 5 seconds, which increases by 50% each time the socket fails to open.

    1. Can take an arbitrary number of “Workers” to handle the load

The load balancer wouldn’t be of much use if it handled the entire workload individually. This prototype (and extended versions of it) actually will instantiate itself N times. I found this to be a quirk of node where launching a separate process (via setting the cluster settings) instead of launching the same process N times prevented me from being able to actually communicate with the worker processes. For example, I would launch the load balancer and passed in four separate javascript worker processes which were separate javascript files. The main load balancer could not communicate with them, even when I passed in a reference to the cluster to each worker process. The only way I could get communication to work was if the forked processes were forked from the same node cluster. I thought that this behavior was really strange and made for some weird looking code (i.e. the default implementation of node clustering which contains a massive if statement in the same javascript file which represents not only the cluster but also all of its workers).

    1. Offers a generic round-robin load balancer irrespective of protocol

The distribute function bound to the load balancer prototype is called whenever the data is received by the master process. I added a default implementation so users wouldn’t have to implement their own method using the prototype out of the box, but it would be easy to overwrite. If you, as the user, wanted to implement a different type of load balancing, you would just need to bind a new distribute() function to the prototype and things would work out of the box.

The last implementation step for the user is to extend the load balancer and specify what the workers do whenever data is received across the wire. A default implementation might look like this:

var LoadBalancer = require('./LoadBalancerPrototype.js'),
	UDPWorker = require('../workers/UDPWorker.js');

var UDPLoadBalancer = function(name, port, worker_name, instances, retryonerror) {
	LoadBalancer.call(this, name, port, 'udp', worker_name, instances, retryonerror);

	console.log('Instantiating UDP Load Balancer on port %d with %d workers', port, instances);
};

UDPLoadBalancer.prototype = Object.create(LoadBalancer.prototype);
UDPLoadBalancer.prototype.name = 'UDP Load Balancer';
//CONFIG: To be set by the user, define the type of worker
UDPLoadBalancer.prototype.worker = RTCPWorker;


// Custom distribution function - sequential.
// Overwrite this function to specify how you want to distribute the load.
// parameters msg, rinfo change depending on the protocol of the Load Balancer
// This is just an example - it is not strictly necessary

// UDPLoadBalancer.prototype.distribute = function(workers, msg, rinfo) {
// };


if (cluster.isMaster) {
	var port = 5005, instances = 2, retry = false;
	for (var i = 0; i < process.argv.length; i++) {
		if (process.argv[i] == '-p') {
			port = process.argv[i+1] ? process.argv[i+1] : port;
		}

		if (process.argv[i] == '-i') {
			instances = process.argv[i+1] ? process.argv[i+1] : instances;
		}

		if (process.argv[i] == '-r') {
			retry = true;
		}
	}

	var udpLoadBalancer = new UDPLoadBalancer('RTCP Balancer 1', port, RTCPWorker.prototype.name, instances, true);
	udpLoadBalancer.start(function(err, msg) {
		if (err) {
			console.log(UDPLoadBalancer.prototype.name + ' error starting up: ' + err);
			udpLoadBalancer.stop(function(err, msg) {
				console.log('UDP load balancer shut down: ' + err + '   ' + msg);
			});
		} else {
			//successful
		}
	});

} else {
	var worker = new UDPLoadBalancer.prototype.worker(cluster.worker, cluster.worker.id);

	cluster.worker.on('online', function() {
		console.log('[%s:%d] is online.', worker.name, worker.instance);
	})
	//msg is of format {data: udp_msg, rinfo: rinfo}
	cluster.worker.on('message', function(msg) {
		worker.doWork(msg.data, msg.rinfo);
	});
}

module.exports = UDPLoadBalancer;

And here’s an example implementation of a worker that handles the load:

var WorkerPrototype = require('./WorkerPrototype.js'),
	WorkerDownError = require('../exceptions/WorkerDownError.js'),
	Decoder = require('../controllers/Decoder.js');

var UDPWorker = function(worker, instance) {
	WorkerPrototype.call(this, worker, RTCPWorker.prototype.name, instance);

        //implementations need to specify what work to do upon receiving a message
	this.doWork = function(msg, rinfo) {
		if (!this.worker || this.worker.suicide === true) {
			throw new WorkerDownError('[%s:%d] Worker is down. Not doing work.', this.name, this.instance);
		}
		console.log('[%s:%d] Doing work [%d].', this.name, this.instance, this.counter);
	}
};

UDPWorker.prototype.name = 'RTCP Worker';
UDPWorker.prototype.getPath = function () { 
	var fullPath = __filename; 
	return fullPath; 
}

module.exports = UDPWorker;

That’s pretty much all there is to it. You can see an example of messages being sent across the wire and how load is distributed for UDP:

Instantiating Worker UDP Worker 0
Instantiating Worker UDP Worker 1
Instantiating Worker UDP Worker 2
[UDP Balancer 1] Received a message from 1.1.0.1:65432 @ [Thu Sep 04 2014 14:27:35 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866055940.
[UDP Balancer 1] Received a message from 1.1.0.2:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866056190.
[UDP Balancer 1] Received a message from 1.1.0.3:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866056439.
[UDP Balancer 1] Received a message from 1.1.0.4:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866056687.
[UDP Balancer 1] Received a message from 1.1.0.5:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866056937.
[UDP Balancer 1] Received a message from 1.1.0.6:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866057187.
[UDP Balancer 1] Received a message from 1.2.0.1:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:2] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057214.
[UDP Balancer 1] Received a message from 1.1.0.7:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057437.
[UDP Balancer 1] Received a message from 1.2.0.2:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:1] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057463.
[UDP Balancer 1] Received a message from 1.1.0.8:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057687.
[UDP Balancer 1] Received a message from 1.2.0.3:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057714.
[UDP Balancer 1] Received a message from 1.1.0.9:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057938.
[UDP Balancer 1] Received a message from 1.2.0.4:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:2] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866057965.
[UDP Balancer 1] Received a message from 1.1.0.10:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866058188.
[UDP Balancer 1] Received a message from 1.2.0.5:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:1] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866058215.
[UDP Balancer 1] Received a message from 1.1.0.11:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058437.
[UDP Balancer 1] Received a message from 1.2.0.6:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058465.
[UDP Balancer 1] Received a message from 1.1.0.1:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058655.
[UDP Balancer 1] Received a message from 1.1.0.12:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [6].
[DECODER]: Decoding udp message with timestamp: 1409866058689.
[UDP Balancer 1] Received a message from 1.2.0.7:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [6].

MongoDB performance enhancements and tweaks

MongoDB performance enhancements and tweaks

In my travails in building and my work on a real time analytics engine, I’ve formed some opinions on how well mongoDB is suited for scalability and how to tweak queries and my node.js code to extract some extra performance. Here are some of my findings, from several standpoints (mongoDB itself, optimizations to the mongoose driver for Node, and node.js itself).

Mongoose Driver
1. Query optimization

A. Instead of using Model.findOne or Model.find and iterating, try to use Model.find().limit() – I encountered a several factor speed up when doing this. This is talked about in several other places online.

B. If you have excess CPU, you can return a bigger chunk of documents and process them using your server instead and free up some cycles for MongoDB.

Improvement: Large (saw peaks of 1500ms for reads in one collection using mongotop. Afterwards, saw this drop to 200ms)

Example:

//Before:
Collection.findOne(query3, function(err, doc) {
  //Returns 1 mongoose document
});

//After
Collection.find(query3).limit(1).exec(function(err, docs) {
  //returns an array of mongoose documents            
});

See these links for some more information: Checking if a document exists – MongoDB slow findOne vs find

2. Use lean()
According to the docs, if you query a collection with lean(), plain javascript objects are returned and not mongoose.Document. I’ve found that in many instances, where I was just reading the data and presenting it to the user via REST or a visual interface, there was no need for the mongoose document because there was no manipulation after the read query.

Additionally, for relational data, if you have for instance a Schema that contains an array of refs (e.g. friends: [{ type: mongoose.ObjectId, ref: ‘User’}]), and you only need to return the first N number of friends to the user, you can use lean() to modify the returned javascript objects and then do population instead of populating the entire array of friends.

Improvement: Large (depending on how much data is returned)

Example:

//Before:
User.find(query, function(err, users) {
  //Users will be mongoose Documents. Hence you can't add fields outside the Schema (unless you have an { type: Any } object
  var options = {
     path: 'friends',
     model: 'User',
     select: 'first last'
  };
  Users.populate('friends', options, function(err, populated)) //will populate ALL friends in the array
});

//After
var query = new Query().lean();
User.find(query, function(err, users) {
  //Users will be javascript objects. Now you can go outside the schema and return data in line with what you need
  users.forEach(function(user) {
     user.friends = friends.splice(0, 10);  //take the first ten friends returned, or whatever
  });
  var options = {
     path: 'friends',
     model: 'User',
     select: 'first last'
  };
  Users.populate('friends', options, function(err, populated)) //now Model.populate populates a potentially much smaller array
});

Results (Example on my node.js server using mongoTop):
Load (ms)
Seconds No Lean() Lean()
5 561 524
10 371 303
15 310 295
20 573 563
25 292 291
30 302 291
35 544 520
40 316 307
45 289 286
50 537 503
Average 409.5 388.3
% improvement 0.051770452 = 5.177%

3. Keep mongoDB “warm”.
MongoDB implements pretty good caching. This can be evidenced by running a query several times in quick succession. When this occurs, my experience has been that the query time decreases (sometimes dramatically so). For instance, a query can go from 50ms to 10ms after running twice. We have one collection that is constantly queried – about 500 times per second for reads and also 500 times per second for writes. Keeping this collection “warm”, i.e. running the query that will be called at some point in the future, can help keep the call responsive when Mongo starts to slow down.

Improvement: Untested
Example:

function keepwarm() {
   setTimeout(function() {
      User.find(query);
      keepwarm();
   }, 500);
}

Mongo Native
1. Compound indexing
For heavy duty queries that run often, I decided to create compound indices using all the parameters that comprised the query. Even though intuitively, it didn’t jump out to me that indexing by timestamp for instance would make a difference, it does. According to the mongoDB documentation, if your query sorts based on timestamp (which ours did), indexing by timestamp can actually help.

Improvement: Large (depending on how large in documents your collection is and how efficiently mongoDB can make use of your indices)
Example:

//in mongo shell
db.collection.ensureIndex({'timestamp': 1, 'user': 1});

//in mongoose schema definition
Model.index({'timestamp': 1, 'user': 1});

Alternative? Aggregating documents into larger documents, such as time slices. Intuitively, that would mean that queries don’t have to traverse as large an index to reach the targeted documents. You may ask what the difference is between creating a compound index versus breaking the document down into aggregates like a day’s or hours slice. Here’s a few possibilities:

  1. A. MongoDB tries to match up queries with indices or compound indices, but there’s no guarantee that this match will occur. Supposedly, the algorithm used to determine which index to use is pretty good, but I question how good it is if for instance, the query you are using includes an additional parameter to search for. If MongoDB doesn’t see all parameters in the index, will it still know to use a compound index or a combination of compound indices?
  2. B. Using aggregates could actually be slower if it requires traversal of the document for the relevant flight data (which might not afford fast reads).
  3. C. If writes are very heavy for the aggregate (e.g. you use an aggregate document that is too large in scope), the constant reading and writing of the document may cause delays via mongoDB’s need to lock the collection/document.
  4. D. Aggregates could make indexing more difficult
  5. E. Aggregates could make aggregation/mapreduce more difficult because your document no longer represents a single instance of an “event” (or is not granular enough)

2. Use Mongotop to determine where your bottlenecks are.
Mongotop shows each collection in your database and the amount of time spent querying reads and writes. By default it updates every second. Bad things happen when the total query time jumps over a second. For instance, in Node, that means that the event queue will begin to block up because mongo is taking too long

Example:

 
//example output
                            ns       total        read       write		2014-07-31T17:02:06
              mean-dev.packets       282ms       282ms         0ms
             mean-dev.sessions         0ms         0ms         0ms
               mean-dev.series         0ms         0ms         0ms
              mean-dev.reduces         0ms         0ms         0ms
             mean-dev.projects         0ms         0ms         0ms

3. Use explain()… sparingly
I’ve found that explain is useful initially, because it will show you the number of scanned documents to reach the result of the query. However, when trying to optimize queries further, I found that it was not that useful. If I’ve already created my compound indices and MongoDB is using them, how can I extract further performance using explain() when explain() may already show a 0 – 1ms duration?

Example:

//in mongo shell
db.collection.find({
        $and: [{
            'from.ID': 956481854
        }, {
            'to.ID': 1038472857
        }, {
            'metadata.searchable': false
        }, {
            'to.IP_ADDRESS': '127.0.0.1'
        }, {
            'from.timestamp': {
                $lt: new Date(ISODate().getTime() - 1000 * 60 * 18)
            }
        }]
    }).explain()

4. For fast inserts for a collection of limited size, consider using a capped collection.

A capped collection in mongoDB is essentially a queue-like data structure that enforces first-in first-out. According to the mongoDB docs, capped collections maintain insertion order, so they’re perfect for time series. You just have to specify what the max size of the collection should be in bytes. I used an average based on: db.collection.stats(), where I found that each record was about 450 bytes in size.

To enforce this, you can run this in the mongoDB shell:

db.runCommand({"convertToCapped": "mycoll", size: 100000}); //size in bytes

See mongoDB docs here:

Node.js
1. Implement pacing for large updates.
I’ve found that in situations where there is a periodic update on a large subset of a collection while many updates are going on, the large update could cause the event queue in Node to backup as mongoDB tried to keep up. By throttling the number of updates that can go on based on total update time, I could adjust based on the load on the server currently. The philosophy is if node/mongoDB have extra cycles, we can dial up the pace of backfilling/updates a bit, whereas when node/mongoDB is overloaded, we can backoff.

Example:


//Runs periodically
    _aggregator.updateStatistics(undefined, updateStatisticsPace, function(result) {
          console.log('[AGGREGATOR] updateStatistics() complete.  Result: [Num Updated: %d, Duration: %d, Average (ms) per update: %d]', result.updated, result.duration, result.average);
          if (result.average < 5) {  //<5 ms, speed up by 10%
            updateStatisticsPace = Math.min(MAX_PACE, Math.floor(updateStatisticsPace * 1.1));    //MAX_PACE = all records updated
          } else if (result.average >= 5 && result.average < 10) { //5 < ms < 10, maintain pace
            updateStatisticsPace = Math.min(MAX_PACE, updateStatisticsPace);
          } else {  //>= 10ms, slow down by 2/3, to a min of 10
            updateStatisticsPace = Math.min(MAX_PACE, Math.max(updateStatisticsPace_min, Math.floor(updateStatisticsPace * .66)));
          }

          if (MAX_PACE === updateStatisticsPace) { console.log('[Aggregator] updateStatistics() - Max pace reached: ' + _count); }
          console.log('[AGGREGATOR] updateStatistics() Setting new pace: %d', updateStatisticsPace);
          callback(null, result)
    });

Dockumo – the crowd in the cloud

Screen Shot 2014-07-22 at 1.23.04 PM

In the past few months, I’ve been working on a project in my spare time called Dockumo. Dockumo is a web-based tool that lets users create “Articles” or basically blurbs of text, edit them and create new versions of them easily. When a user edits an article, the user is given the option to “Save” it or “Save as New Version.” Saving as a new version will generate a “Series” for the article, which will then start keeping track of all versions of the article. Pretty basic stuff. It’s my first foray into making consumer-facing software, including making better software tools for lawyers (my ultimate goal).

Dockumo main page (for text comparison)
Dockumo main page (for text comparison)

The utility here is that the user can see the article and versions through a timeline view and can easily compare any two versions (or any two articles for that matter) with a couple of clicks. I built this because comparison before required making two word documents, saving them, then comparing them and saving the result. Dockumo does this, but makes it much faster to do. Also, there’s no need to store lots of messy versions on your computer since it’s stored in the cloud. Furthermore, I built in some functionality to export the result to a text file, html doc or word doc (word was by far the most challenging). Exporting actually keeps track of the changes in “Track changes” format in word, which I implemented through a modification to the node.js officegen module.

An example of an article
An example of an article

Here’s what the timeline view looks like:

Timeline view of an article
Timeline view of an article

Finally, Dockumo lets users group articles together in “Journals.” Journals are a way to get related content together and then to share them with your friends or other users on the site. When you share a journal, it becomes a “collaboration” for all users who are invited. Those collaborators can add new versions of any articles grouped in the journal. The idea behind this is that users will be able to view and edit data together, while keeping track of new versions. There’s an option, for instance, to receive a notification whenever a collaborator edits a series. So for example, if five users are collaborators on a journal who are working on a report or a term paper, each one can make his or her modifications and save as a new version. As the document evolves through time, each user can see which other user made the change and how the article changed overtime through the comparison view.

A journal
A journal

What’s left?

When I started out making Docukmo, I only intended it to be a quick and easy way to diff two blurbs or text. I didn’t want feature creep to set in, but it inevitably did. Journals were not planned from the beginning, but I figured users would want a way to see relevant content. I ended up having to add lots of other elements as well, such as “friending” other users, user search, a notifications system, exporting to other formats, and emailing users who want updates, user permissions. All of these things added complexity and time. A good lesson for me though: When I thought I was able to launch the website, I was actually still about a month out. It wasn’t until I tried to launch did I realize that there were so many niggling things left undone. For instance, not allowing users to register under the same username or email address, or giving a password reset mechanism. Fortunately, I can carry over a lot of this code to any future projects down the line.

And feature-creep. I am probably the worst-offender of this concept ever. I always think to myself, “well if the software doesn’t do [X], then users won’t want to use it!” For me, this is an ever present temptation to continue adding feature upon feature to the source while never actually launching. Since launching, there have been other features that I really want to add (and probably will). For instance, I want to give users the option to make articles “public” and tag them with keywords so that the community (all users within the cloud) can search for a particular type of document (e.g. a cover letter or a thank you note), see an example of a it, and suggest modifications and rate existing articles. In other words, have a community of contributors who persistently improve cloud-based, crowd-sourced documents. That’s why I called the product “Dockumo”, or “doc” + “kumo” (“cloud” in Japanese).

Let me know what you guys think. I’m always happy to discuss improvements I can make or my technical/architectural decisions.

About page
About page