Clustering in Node.js (Part 2) – Round-robin load balancing in a node cluster – notifying all workers

Here is part two of the round-robin load balancing post, which explains how to notify all workers depending on the type of data. This actually is very simple to do in the default architecture as laid out in Part 1. This provides a lot of utility because it allows each worker to maintain a state based on the type or amount of work its completed, while the Load Balancer itself remains stateless. Essentially it works like this:

  1. Inspect the contents of the data. If the data warrants a notification to all workers, then return the type of notification to be sent. If it doesn’t, don’t send any notification
  2. If there is a notification to be sent to workers, send it to all workers before actually distributing the data.
  3. If a worker receives the notification, it will update its state before processing the data.

It’s really that simple. To implement this, I added a prototypal method to the LoadBalancer class I had written, called broadcast(data). Broadcast essentially inspects the data and determines what type of notification to send, if any, to the workers.

/**
 * This method if returns a notification type based on the data.  If the type
 * is not defined, then each worker will be notified by the type of notification
 * so the workers will need to implmement notify() to take advantage of the type
 *
 * @method broadcast
 * @param {Object} data Data is of format { data: msg, rinfo: rinfo }
 * @return {Object} Returns a NotificationPrototype or undefined to disable
 */
MyLoadBalancer.prototype.broadcast = function(data) {
    if (needsNotification(data)) {
        return NotificationType;
    } else {
        return undefined;
    }
}

Next, it was necessary to modify the LoadBalancer to send the notification first if there was one to be sent. Essentially, for each data source/server implemented in LoadBalancer, I added this:

function serverWrapper() { ...
    c.on('data', function(data) {
	  		console.log('[%s] Receiving TCP data', self.name);
	  		var data = {
	  			remote_address: c.remoteAddress,
	  			data: data
	  		};
	  		self.counter++;

	  		var notification = createNotification.call(self, self.broadcast(data), data);
			if (notification) {
		  		self.notify.call(self, self.workers, notification);	
		  	}
		  	self.distribute.call(self, self.workers, data);	
	  	});

    ...
}
/**
 * Default implementation of notify - notifies all workers about some data
 *
 * @method distribute
 * @param {Array[cluster.Worker]} workers The array of workers managed by this LoadBalancer.  
 * @param {Object} data An object containing the message to send to the workers
 */
LoadBalancer.prototype.notify = function(workers, notification) {
	var self = this;
	try {
		workers.forEach(function(worker) {
			worker.send({
				type: 'notification',
				data: notification
			});
		});
	} catch (e) {
		if (e instanceof WorkerDownError) {
			console.log('Worker %s is down.  Removing from workers for [%s:%d]', worker.name, self.name, self.instance);
			workers.splice(workers.indexOf(worker), 1);
		} else {
			throw e;
		}
	}
};

/**
* Instantiate a notification if there is one
* @method createNotification
* @param {Function} BroadcastType The notification type
* @param {Object} data The data associated with the receiver
*/
function createNotification(BroadcastType, data) {
	if (!BroadcastType) {
		return undefined;
	} else {
		return new BroadcastType(BroadcastType.prototype.type, this.name, data);
	}
}

This code is really simple. You can see that when the data event is thrown based on the server type, the prototype’s broadcast method is called to determine whether there needs to be a notification broadcast to each of the workers. If broadcast returns a notification type (‘BroadcastType’ here), then you’ll end up with an instance of the notification wrapping the data. Then for each worker, that notification is sent and the worker can determine what to do with the notification.
If there isn’t any notification (i.e. it’s undefined), then the LoadBalancer will just proceed as normal and distribute the work. In this code, notifications are really simple, essentially just plain data objects wrapped in a function in javascript. Here’s an example:

'use strict'

/**
* Notification class is a wrapper for a javascript object that gets passed upon some event triggered by a 
* data source consumed by a Load Balancer
*
* @class Notification
* @constructor
* @param {String} type A string of the type (similar to an enum)
* @param {String} source The name of the source instantiating this notification
* @param {Object} data The datagram bundled with this notification
*/
var Notification = function(type, source, data) {
	this.type = type;
	this.source = source;
	this.data = data;
	this.created = new Date().getTime();
};

//Rebind toString
Notification.prototype.stringify = function() {
	var str = '';
	str += 'Notification @: ' + this.created + ' from ' + this.source + ' of type ' + this.type;
	return str;
};

Notification.prototype.type = undefined;

module.exports = Notification;

//And in the worker class, implement this.notify
var Worker = function(worker, instance) { 
...
	this.notify = function(notification) {
		console.log('[%s:%d] Received notification from %s: %s', this.name, this.instance, notification.source, notification.type);
		switch(notification.type) {
			case 'Generic':
				break;
			case 'Example':
				break;
			default:
				break;
		}
	};
}

That’s pretty much all there is to it. Now when we receive data of a certain type, we can tell all workers to update a state variable. Neat!

[GenericLoadBalancer] Starting up Load Balancer on Port 5005 with 2 workers.
[GenericLoadBalancer] Server opening socket and listening on port 5005
[GenericLoadBalancer] Finished starting up.  Instantiating workers.
Instantiating Worker Generic Worker 0
Instantiating Worker Generic Worker 1
[GenericLoadBalancer] Received a message from 1.1.0.1:65432 @ [Fri Sep 12 2014 15:11:52 GMT-0700 (PDT)] of size [48] bytes.
[GenericLoadBalancer] Received a message from 1.2.0.2:65432 @ [Fri Sep 12 2014 15:11:52 GMT-0700 (PDT)] of size [48] bytes.
[Generic Worker:1] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:2] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:1] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:2] Doing work [0].
[Generic Worker:1] Doing work [0].
Advertisements

Clustering in Node.js (Part 1) – Round-robin load balancing in a node cluster

As we start to scale out our analytics service here, we’ve started thinking of ways to leverage node’s very good event-driven model across multiple cores and eventually multiple machines. Node clustering is still in its nascent stages, so there’s not a whole lot of functionality and it relies on OS-level forking to “scale out.” While this works in principle, it means that clustering in node is not native per se and that node’s cluster module essentially just provides a weak layer of abstraction across several node processes that are effectively launched from your command line.

Although node already implements load balancing for HTTP requests in its cluster module, it doesn’t handle load balancing for other protocols. Also, node’s load balancing algorithm had a few quirks related to load balancing by relying on the OS’s scheme of waking up dormant processes. (see here http://strongloop.com/strongblog/whats-new-in-node-js-v0-12-cluster-round-robin-load-balancing/). Hence, in Node.js v0.11.2, the developers at Joyent decided to move back to a round-robin scheme. Round-robin is not terribly sophisticated and make no judgment as to the type of work a worker is currently processing.  While it works well when each request is essentially equal in the amount of work required, it’s not so good when the incoming work to do varies in nature.  At least it’s conceptually simple and simple to implement.

My first major stumbling block when using node’s cluster module was the fact that this load balancing wasn’t supported for UDP.  Furthermore, we receive analytics data using a range of protocols (e.g. SFTP, TCP/IP, FTP, etc.) so I didn’t want to rely on node’s native libraries in case no one thought it important enough to extend load balancing to a variety of protocols. Thus, I had to come up with my own generalized architecture for implementing load balancing. Have a look:


var dgram = require('dgram')
	cluster = require('cluster'),
	WorkerPrototype = require('../workers/WorkerPrototype.js'),
	WorkerDownError = require('../exceptions/WorkerDownError.js'),
	net = require('net');

// Private Functions
var protocols = {
	'udp': {
		'start': listen_udp,
		'stop': shutdown_udp
	},
	'tcp_server': {
		'start': open_tcp_server,
		'stop': shutdown_tcp_server
	},
	'tcp_client': {
		'start': open_tcp_client,
		'stop': shutdown_tcp_client
	}
};

function listen(callback) {
	protocols[this.protocol].start.call(this, callback);
}

function shutdown(callback) {
	protocols[this.protocol].stop.call(this, callback);
}

//This function will restart the server every retry interval if restart is enabled
function restart(callback) {
	var self = this;
	console.log('[%s] Trying restart in %d ms ...', self.name, self.retrydelay);
	setTimeout(function() {
		self.retrydelay = Math.floor(self.retrydelay * 1.5);
		listen.call(self, callback);
	}, self.retrydelay );
}

//UDP Handling
function listen_udp(callback) {
	var self = this;
	var server = dgram.createSocket('udp4');
	server.on('error', function (err) {
        console.log('[%s] Error:\n' + err.stack, self.name);
        try {
      		server.close();
      	} catch (err) {
      		if (err) {
      			console.log('[%s] Error closing tcp connection: %s', self.name, err);
      		}
      	} finally {
      		if (self.restart) {
	        	restart.call(self, callback);
	       	} else {	//otherwise, just calls callback
	        	if (callback)
	        		callback(err);
	        }
      	}
    });

    server.on('message', function (msg, rinfo) {
		console.log('[%s] Received a message from ' + rinfo.address + ':' + rinfo.port + ' @ [%s] of size [%d] bytes.', self.name, new Date(), msg.length);
		self.counter++;
		self.distribute.call(self, self.workers, {
			data: msg, 
			rinfo: rinfo
		});
	});

	server.bind(self.port, function() {
	    console.log('[%s] Server opening socket and listening on port ' + self.port, self.name);
	});

	server.on('listening', function () {
	  	var address = server.address();
	  	var msg = '[' + self.name + ']  SUCCESS! UDP socket now listening @ ' +
	      	address.address + ':' + address.port;
	  	if (callback) {
	  		self.retrydelay = self.restartdelay;
	  		callback(null, msg);
	  	}
	});

	self.server = server;
	return server;
}

function shutdown_udp(callback) {
	var self = this;
	if (self.server) {
		self.server.close();
		var msg = '[' + self.name + '] Load Balancer shutdown successfully.';
		if (callback)
			callback(null, msg);
		
	} else {
		var err = '[' + self.name + '] Load Balancer not defined!  Could not be shutdown.';
		callback(err);
	}
}

//TCP Client - creates connection to another TCP server 
function open_tcp_server(callback) {
   //implementation goes here
}

function shutdown_tcp_server(callback) {
   //implementation goes here
}

function open_tcp_client(callback) {
   //implementation goes here
}

function shutdown_tcp_client(callback) {
   //implementation goes here
}

//Private Load Balancer
function instantiateWorkers() {
	for (var i = 0; i < this.instances; i++) {
		console.log('Instantiating Worker %s %d', this.worker_name, i);
		var worker = cluster.fork({
			name: this.worker_name,
			id: i
		});
	    this.workers.push(worker);
	}
}

/**
* Load Balancer is a class that wraps functionality for opening/closing a UDP or TCP connection
* It also implements default load balancing based on the number of workers specified.
*
* @class LoadBalancer
* @constructor
* @param {String} name The name for this load balancer
* @param {Number} port The port number
* @param {String} protocol Either udp or tcp at the moment, can be extended later on
* @param {String} worker_name Give each worker a name
* @param {Number} instances The number of instances of the LoadBalancer to be "forked" although, each instance should only receive messages
*/
var LoadBalancer = function(name, port, protocol, worker_name, instances, retryonerror) {

	if (!port || !protocol) {
		throw new Error('Could not instantiate load balancer.  Port/Protocol/Workers undefined.');
	}

	if (instances <= 0) {
		throw new Error('Not enough worker instances for workers.  Failed to start.');
	}

	if (!protocols[protocol]) {
		throw new Error('Could not instantiate load balancer.  Unknown protocol %s', protocol);
	}

	//Set by User
	this.name = name;
	this.port = port;
	this.protocol = protocol;
	this.worker_name = worker_name;
	this.instances = instances;
	this.restart = retryonerror ? retryonerror : false;	//if true, will try to reopen connection upon error
	this.restartdelay = 5000;	//will multiply by 1.5 each time and reset when connection is reestablished

	//Private members
	this.workers = [];
	this.server;
	this.counter = 0;
	this.retrydelay = this.restartdelay;

	var self=this;

	this.start = function(callback) {
		listen.call(self, function(err, msg) {
			self.counter = 0;	//reset counter
			if (!err)
				instantiateWorkers.call(self);

			if (callback) {
				callback(err, msg);
			}
		});
	}

	this.stop = function(callback) {
		shutdown.call(this, function(err, msg) {
			if (err) 
				console.log(err);
			if (callback)
				callback(err, msg);
		});
	}

	this.restart = function() {
		this.stop(function(err, msg) {
			if (err) {
				throw new Error(err);
			} else {
				this.start();
			}
		});
	}
};

/**
 * Default implementation of distribute - sequential cycling through workers
 *
 * @method distribute
 * @param {Array[cluster.Worker]} workers The array of workers managed by this LoadBalancer.  
 * @param {Object} data An object containing the data received on the port for this Load Balancer.  Ex:  UDP would send data {data: msg, rinfo: rinfo}
 */
LoadBalancer.prototype.distribute = function(workers, data) {
	var self = this;
	var worker = workers[self.counter % workers.length];
	try {
		if (worker) {
			worker.send(data);
		}
	} catch (e) {
		if (e instanceof WorkerDownError) {
			console.log('Worker %s is down.  Removing from workers for [%s:%d]', worker.name, self.name, self.instance);
			workers.splice(workers.indexOf(worker), 1);
		} else {
			throw e;
		}
	}
};

module.exports = LoadBalancer;

Basically, this prototype does a few things:

    1. Implements generalized servers by protocol to receive data

You can see that when one instantiates or extends the load balancer prototype, the user will need to specify the protocol to use. Since things like ports and instances of workers are common to all load balancers, these are private attributes of the load balancer instance. Essentially, all servers under this model work in the same way. A master server is instantiated which receives all data over a socket. When the server receives a request or message, it will pass it off to one of N workers (specified by the user) which is running in its own process. That worker then does the work.

    1. Implements default methods to start, stop and restart servers

A user doesn’t need to concern herself about the starting, stopping and restarting of servers should a socket go down. That’s why there is some dummy logic for reopening a socket upon error (e.g. if the socket is flooded and closes). The default mechanism is to try reopening the socket after 5 seconds, which increases by 50% each time the socket fails to open.

    1. Can take an arbitrary number of “Workers” to handle the load

The load balancer wouldn’t be of much use if it handled the entire workload individually. This prototype (and extended versions of it) actually will instantiate itself N times. I found this to be a quirk of node where launching a separate process (via setting the cluster settings) instead of launching the same process N times prevented me from being able to actually communicate with the worker processes. For example, I would launch the load balancer and passed in four separate javascript worker processes which were separate javascript files. The main load balancer could not communicate with them, even when I passed in a reference to the cluster to each worker process. The only way I could get communication to work was if the forked processes were forked from the same node cluster. I thought that this behavior was really strange and made for some weird looking code (i.e. the default implementation of node clustering which contains a massive if statement in the same javascript file which represents not only the cluster but also all of its workers).

    1. Offers a generic round-robin load balancer irrespective of protocol

The distribute function bound to the load balancer prototype is called whenever the data is received by the master process. I added a default implementation so users wouldn’t have to implement their own method using the prototype out of the box, but it would be easy to overwrite. If you, as the user, wanted to implement a different type of load balancing, you would just need to bind a new distribute() function to the prototype and things would work out of the box.

The last implementation step for the user is to extend the load balancer and specify what the workers do whenever data is received across the wire. A default implementation might look like this:

var LoadBalancer = require('./LoadBalancerPrototype.js'),
	UDPWorker = require('../workers/UDPWorker.js');

var UDPLoadBalancer = function(name, port, worker_name, instances, retryonerror) {
	LoadBalancer.call(this, name, port, 'udp', worker_name, instances, retryonerror);

	console.log('Instantiating UDP Load Balancer on port %d with %d workers', port, instances);
};

UDPLoadBalancer.prototype = Object.create(LoadBalancer.prototype);
UDPLoadBalancer.prototype.name = 'UDP Load Balancer';
//CONFIG: To be set by the user, define the type of worker
UDPLoadBalancer.prototype.worker = RTCPWorker;


// Custom distribution function - sequential.
// Overwrite this function to specify how you want to distribute the load.
// parameters msg, rinfo change depending on the protocol of the Load Balancer
// This is just an example - it is not strictly necessary

// UDPLoadBalancer.prototype.distribute = function(workers, msg, rinfo) {
// };


if (cluster.isMaster) {
	var port = 5005, instances = 2, retry = false;
	for (var i = 0; i < process.argv.length; i++) {
		if (process.argv[i] == '-p') {
			port = process.argv[i+1] ? process.argv[i+1] : port;
		}

		if (process.argv[i] == '-i') {
			instances = process.argv[i+1] ? process.argv[i+1] : instances;
		}

		if (process.argv[i] == '-r') {
			retry = true;
		}
	}

	var udpLoadBalancer = new UDPLoadBalancer('RTCP Balancer 1', port, RTCPWorker.prototype.name, instances, true);
	udpLoadBalancer.start(function(err, msg) {
		if (err) {
			console.log(UDPLoadBalancer.prototype.name + ' error starting up: ' + err);
			udpLoadBalancer.stop(function(err, msg) {
				console.log('UDP load balancer shut down: ' + err + '   ' + msg);
			});
		} else {
			//successful
		}
	});

} else {
	var worker = new UDPLoadBalancer.prototype.worker(cluster.worker, cluster.worker.id);

	cluster.worker.on('online', function() {
		console.log('[%s:%d] is online.', worker.name, worker.instance);
	})
	//msg is of format {data: udp_msg, rinfo: rinfo}
	cluster.worker.on('message', function(msg) {
		worker.doWork(msg.data, msg.rinfo);
	});
}

module.exports = UDPLoadBalancer;

And here’s an example implementation of a worker that handles the load:

var WorkerPrototype = require('./WorkerPrototype.js'),
	WorkerDownError = require('../exceptions/WorkerDownError.js'),
	Decoder = require('../controllers/Decoder.js');

var UDPWorker = function(worker, instance) {
	WorkerPrototype.call(this, worker, RTCPWorker.prototype.name, instance);

        //implementations need to specify what work to do upon receiving a message
	this.doWork = function(msg, rinfo) {
		if (!this.worker || this.worker.suicide === true) {
			throw new WorkerDownError('[%s:%d] Worker is down. Not doing work.', this.name, this.instance);
		}
		console.log('[%s:%d] Doing work [%d].', this.name, this.instance, this.counter);
	}
};

UDPWorker.prototype.name = 'RTCP Worker';
UDPWorker.prototype.getPath = function () { 
	var fullPath = __filename; 
	return fullPath; 
}

module.exports = UDPWorker;

That’s pretty much all there is to it. You can see an example of messages being sent across the wire and how load is distributed for UDP:

Instantiating Worker UDP Worker 0
Instantiating Worker UDP Worker 1
Instantiating Worker UDP Worker 2
[UDP Balancer 1] Received a message from 1.1.0.1:65432 @ [Thu Sep 04 2014 14:27:35 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866055940.
[UDP Balancer 1] Received a message from 1.1.0.2:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866056190.
[UDP Balancer 1] Received a message from 1.1.0.3:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866056439.
[UDP Balancer 1] Received a message from 1.1.0.4:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866056687.
[UDP Balancer 1] Received a message from 1.1.0.5:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866056937.
[UDP Balancer 1] Received a message from 1.1.0.6:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866057187.
[UDP Balancer 1] Received a message from 1.2.0.1:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:2] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057214.
[UDP Balancer 1] Received a message from 1.1.0.7:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057437.
[UDP Balancer 1] Received a message from 1.2.0.2:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:1] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057463.
[UDP Balancer 1] Received a message from 1.1.0.8:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057687.
[UDP Balancer 1] Received a message from 1.2.0.3:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057714.
[UDP Balancer 1] Received a message from 1.1.0.9:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057938.
[UDP Balancer 1] Received a message from 1.2.0.4:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:2] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866057965.
[UDP Balancer 1] Received a message from 1.1.0.10:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866058188.
[UDP Balancer 1] Received a message from 1.2.0.5:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:1] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866058215.
[UDP Balancer 1] Received a message from 1.1.0.11:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058437.
[UDP Balancer 1] Received a message from 1.2.0.6:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058465.
[UDP Balancer 1] Received a message from 1.1.0.1:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058655.
[UDP Balancer 1] Received a message from 1.1.0.12:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [6].
[DECODER]: Decoding udp message with timestamp: 1409866058689.
[UDP Balancer 1] Received a message from 1.2.0.7:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [6].

Mongoose / MongoDB performance enhancements and tweaks (Part 3)

bush_doing_it_wrong_1

Holy crap. MongoDB native drivers are SO much faster than updates through mongoose’s ORM.  Initially, when I set out on this quest to enhance mongoDB performance with node.js, I thought that modifying my queries and limiting the number of results returned would be sufficient to scale. I was wrong (thanks Bush for the imagery). It turns out that the overhead that mongoose adds for wrapping mongoDB documents within mongoose’s ORM is tremendous. Well, I should say tremendous for our use case.

In my previous two posts of tweaking mongoDB / mongoose for performance enhancements (Part 1 and Part 2), I discussed optimization of queries or making simple writes instead of reads. These were worthwhile improvements and the speed difference eventually added up to significant chunks, but I had no idea moving to the native driver would give me these types of improvements (See below).

Example #1: ~400 streams, insertion times.
These numbers are from after I made the initial tweaks after Part 1. Unfortunately, I don’t really have that good of a printout from mongotop, but this kind of gives you an idea. Look at the write times for streams and packets, flowing at a rate of ~400 streams. This is for 400 sources of packets, which all gets written and persisted. Here you can see the write time to streams is @ 193ms / 400 streams or 48.25 ms / 100 streams. Likewise, packet writing is 7.25 ms / 100 streams. (You can mostly ignore read time, these are used for data aggregates and computing analytics). Compare these with the results below:

ns total read write
streams 193ms 0ms 193ms
packets 30ms 1ms 29ms
devices 9ms 9ms 0ms

Example 2: ~1000 streams, insertion times.
You can see here that write time has dropped significantly. Writes to the packets collection is hovering at around 1.7 ms / 1000 streams, and writes to the streams collection hovers at around 7.6 ms / 100 streams. Respectively, that’s a 425% and a 635% improvement in query write times to the packets collection and streams collection. And don’t forget, I had already begun the optimizations to mongoose. Even after the tweaks I made in Part 2, these numbers still represent a better than 100% improvement to query times. Huge, right?

ns total read write
packets 186ms 169ms 17ms
devices 161ms 159ms 2ms
streams 97ms 21ms 76ms

I knew using the mongoDB native drivers would be faster, but I hadn’t guessed that they would be this much faster.  

To make these changes, I updated mongoose to the latest version 3.8.14, which enables queries to be made using the native mongoDB driver released by 10gen (github here: https://github.com/mongodb/node-mongodb-native) via Model.Collection methods.  These in turn call methods defined in node_modules/mongodb/lib/mongodb/collection/core.js, which essentially just execute raw commands in mongo. Using these native commands, one can take advantage of things like bulk inserts.

I still like mongoose, because it helps instantiate the same object whenever you need to create and save something. If something isn’t defined in the mongoose.Schema, that object won’t get persisted to mongoDB either. Furthermore, it can still be tuned to be semi-quick, so it all depends on the use case. It just so happens that when you’re inserting raw json into mongoDB or don’t need the validation and other middleware that mongoose provides, you can use the mongoDB native drivers while still using mongoose for the good stuff. That’s cool.

Here’s what the new improvements look like:

    var Stream = mongoose.model('Stream');
    async.waterfall([
        //Native mongoDB update returns # docs updated, update by default updates 1 document:
        function query1 (callback) {
            Stream.collection.update(query1, query1set, {safe: true}, function(err, writeResult) {
                if (err) throw err;
                if (writeResult == 1) {
                    callback('Found and updated call @ Query1');
                } else {
                    callback(null);
                }
            });
        },
        function(callback) {
            Stream.collection.update(query2, query2set, {safe: true}, function(err, writeResult) {
                if (err) throw err;
                if (writeResult == 1) {
                    callback('Found and updated stream @ Query2');
                } else {
                    pushNewStream(packet, cb);
                    callback('No stream found.  Pushing new stream.');
                }
            });

        }
    ], function(err, results) {});

Mongoose / MongoDB speed improvements (Part 2)

In a previous post MongoDB performance enhancements and tweaks, I described some techniques I’ve found to speed up mongoose inserts and updates on large volume performance statistic data. I was still seeing performance bottlenecks in MongoDB, especially after running a node cluster for our data analytics. Since node is now spread to multiple cores (scaling “horizontally”, to be described in another post), the writes generally come to MongoDB much faster. The main question for me was whether it was mongoose, the node-mongoDB driver or mongo per se slowing me down.

The problem:

When we get performance data, we start tracking it by the creation of a “stream” object. The stream object is first created with a unique identifier, then subsequent packets that come in update the stream. The streams get the highest packet value of the incoming packet and update their timestamp with the packet’s timestamp. Later on when we stop seeing packets flow for a particular stream, we time it out so analytics can be computed for all packets that came in from the beginning of the stream to the end of the stream.

My initial implementation made a series of read queries using mongoose find(query), returned all potential matching streams, then updated the matching stream. The source code looked something like this.

function updateStream(packet) {
   var stream = mongoose.model('Stream');
   var query1 = {
        $and: [{
            'from.ID': packet.fromID
        }, {
            'to.ID': packet.toID
        }, {
            'stream_ended.from': false
        }, {
            'from.IP_ADDRESS': packet.IP
        }, {
            'from.highestPacketNum': {
                $lt: packet.highestPacketNum
            }
        }]
    };

   //Since streams are bilateral, we have to do two query reads in order to find the matching stream
   var query2 = {
        $and: [{
            'to.ID': packet.toID
        }, {
            'from.ID': packet.fromID
        }, {
            'stream_ended.to': false
        }, {
            'to.IP_ADDRESS': IP
        }, {
            'to.highestPacketNum': {
                $lt: packet.highestPacketNum
            }
        }]
    };  

   async.waterfall([
      function (callback) {
         stream.find(query1).exec(function(err, calls) { 
            if (calls.length > 1) {  //throw error, there should only be 1 stream
               throw new Error('There should only be one stream with this unique identifier');
            } else if (calls.length == 1) {
               //update calls[0]
               calls[0].save(cb);
               callback(null);
            } else {
               callback('Query1 yielded no results');  //continue down the waterfall
            }
         });
      },
      function (callback) {
         stream.find(query2).exec(function(err, calls) { 
            if (calls.length > 1) {  //throw error, there should only be 1 stream
               throw new Error('There should only be one stream with this unique identifier');
            } else if (calls.length == 1) {
               //update calls[0]
               calls[0].save(cb);
               callback(null);
            } else {
               callback('Query2 yielded no results');  //continue down the waterfall
            }
         });
      }
   ], cb);
}

You can see that this sourcecode was highly inefficient because mongoose was returning all possible matches of the stream. This was based on a limitation by our packet simulator, which at the time did not spoof unique IDs in the packets that it would send. At this point in time, we were capped at around 250 simultaneous streams, running in 1 node process.

Result: ~250 simultaneous streams of data

Improvement #1: Limit the search to the first object found, update it and persist it.

Essentially the source code remained the same, but the mongoose.find queries changed from find(query1).exec to find(query1).limit(1).exec(). With this change, we saw an improvement of around 50%, since mongoose would return after finding the first match. At this point, the blocker shifted back to node.js, and I noticed that at a certain point, the event queue would block up with handling too many events. AT the time, node.js was responsible for doing aggregation, decoding the packets, invoking mongoose and storing them, and running our REST API as well as serving up static data. I saw my poor little node.js process pinged out at 100% trying to churn through all the data. One good thing I noticed though, is that even though node.js was capped out in terms of resources, it still continued to churn through the data, eventually processing everything given enough time.

Result: ~350 simultaneous streams of data, query time improved by about 50%

Improvement #2: Clustering node.js

This deserves a post in itself since it required breaking out the various services that the single node process was handling into multiple forked processes, each doing its own thing. Suffice it to say, I eventually decided to fork the stream processors into N instances, where N is the number of cores on the machine / 2, with basic load balancing. This caused node to write to mongoDB much faster, and caused delays which eventually bogged mongoDB down. Thus the pendulum swung back to mongoDB.

Result: ~400 simultaneous streams of data, query time remains the same, mongoDB topped out.

Improvement #3: mongoDB updates in place

Finally, I realized that there was no reason to actually get the stream object returned in our source code, since I was just making updates to it. I had also noticed that it was actually the read time in mongotop that was spiking when the number of streams increased. This was because the find() functions in mongoose return a mongoose wrapped mongoDB document, so the update does not happen in place. For simple updates without much logic, there is no point to getting the object back, even when using the .lean() option to get json back. Here was the update:

function updateStream(packet) {
    //queries remain the same ...

    async.waterfall([
        function(callback) {
            Stream.findOneAndUpdate(query1, {$set: { 'endTime': packet.timestamp, 'metadata.lastUpdated': new Date(), 'from.highestPacketNum': highestPacketNum }}, {new: false}).exec(function(err, doc) {
                if (doc) {
                    cb(err, doc);
                    callback('Found and updated stream @ Query1');
                } else {
                    callback(null);
                }
            });
        },

        //No matching yet with IP, so try with just swapped SSRCs and update that one
        function(callback) {
            Stream.findOneAndUpdate(query2, {$set: { 'to.IP_ADDRESS': IP, 'endTime': packet.timestamp, 'metadata.lastUpdated': new Date(), 'to.highestPacketNum': highestPacketNum }}, {new: false}).exec(function(err, doc) {
                if (doc) {
                    cb(err, doc);
                    callback('Found and updated stream @ Query2');
                } else {
                    createNewStream(packet, cb);
                    callback('No streams found.  Pushing new stream.');
                }
            });
        }
    ], function(err, results) {

    });
}

It turns out that after this improvement, I saw in mongotop that read time dropped to 0ms per second, and write time slightly spiked. However, this was by far the biggest improvement in overall query time.

Result: ~600 simultaneous streams of data, query time dropped by 100 – 200% (when including read time dropping to 0). This, combined with the stream processors running on multiple cores seemed to be a scalable solution that would significantly spike our capacity, but I noticed that at around 600 simultaneous streams of data, suddenly our stream creation would spike, and continue increasing.

Improvement #4: MongoDB upgrade to 2.6.x and query improvement using $max field update operator

For all you readers who like mysteries, can you guess what was causing the stream creation to spike? I spent a long time thinking about it. For one, mongotop didn’t seem to be topped out in terms of the total query time on the streams collection. I noticed spikes of up to 400 or so ms for total query time, but it seemed by and large fine. Node.js was running comfortably at around 40% – 50% cpu usage per core on each of the four cores. So if everything seemed fine, why was stream creation spiking?

The answer, it turns out, was a race condition caused by the processing of the second packet of the simultaneous stream before the first packet could be used to instantiate a new stream object. At a certain point, when enough simultaneous streams were incoming, the delay in creation of the new stream would eclipse the duration between the first and second packets of that same stream. Hence, everything after this point created a new stream.

I thought for a while about a solution, but I got hung up either on choosing a synchronous processing of the incoming packets, which would significantly decrease our throughput, or use a “store and forward” approach where a reconciliation process would go back and match up streams. To be fair, I still have this problem, but I was able to reduce its occurrence to a significant extent. Because there’s no guarantee that we would be handling the packets in a synchronous order, I updated our query to make use of the $max field update operator, which would only update the stream highest packet number if a packet with the same IDs and higher packet number came in. This in turn let us reduce the query time because I no longer had to query to find a stream with a lower packet number than the incoming packet. After this update, I noticed that the reduced query time significantly reduced the total query time on the collection and at the same time helped the race condition issue.

function updateStream(packet) {
    var query1 = {
        $and: [{
            'from.ID': packet.fromID
        }, {
            'to.ID': packet.toID
        }, {
            'stream_ended.from': false
        }, {
            'from.IP_ADDRESS': packet.IP
        }]
    };

    var query2 = {
        $and: [{
            'to.ID': packet.fromID
        }, {
            'from.ID': packet.toID
        }, {
            'stream_ended.to': false
        }]
    };

    async.waterfall([

        //First see if there is a call object
        function(callback) {
            Call.findOneAndUpdate(query1, {
                $set: { 'endTime': packet.timestamp, 
                        'metadata.lastUpdated': new Date()
                    },
                $max: {
                    'from.highestPacketNum': highestPacketNum
                }
            }, {new: false}).exec(function(err, doc) {
                if (doc) {
                    cb(err, doc);
                    callback('Found and updated stream @ Query1');
                } else {
                    callback(null);
                }
            });
        },

        function(callback) {
            Call.findOneAndUpdate(query2, {
                $set: { 'to.IP_ADDRESS': packet.IP, 
                        'endTime': packet.timestamp, 
                        'metadata.lastUpdated': new Date()
                    },
                $max: {
                    'to.highestPacketNum': highestPacketNum 
                }
            }, {new: false}).exec(function(err, doc) {
                if (doc) {
                    cb(err, doc);
                    callback('Found and updated stream @ Query3');
                } else {
                    pushNewStream(packet, cb);
                    callback('No stream found.  Pushing new stream.');
                }
            });
        }
    ], function(err, results) {

    });
    return true;
}

Note that the threshold for the race condition is just higher with this approach and not completely solved. If enough streams come in, I’ll still eventually get the race condition where the stream is not instantiated before the second packet is being processed. I’m still not quite sure what the best solution is here, but as with everything, improvement is an incremental process.

Result: ~1000 simultaneous streams, query time dropped by 100%, 4 cores running at 40% – 50% cpu.

Dockumo – the crowd in the cloud

Screen Shot 2014-07-22 at 1.23.04 PM

In the past few months, I’ve been working on a project in my spare time called Dockumo. Dockumo is a web-based tool that lets users create “Articles” or basically blurbs of text, edit them and create new versions of them easily. When a user edits an article, the user is given the option to “Save” it or “Save as New Version.” Saving as a new version will generate a “Series” for the article, which will then start keeping track of all versions of the article. Pretty basic stuff. It’s my first foray into making consumer-facing software, including making better software tools for lawyers (my ultimate goal).

Dockumo main page (for text comparison)
Dockumo main page (for text comparison)

The utility here is that the user can see the article and versions through a timeline view and can easily compare any two versions (or any two articles for that matter) with a couple of clicks. I built this because comparison before required making two word documents, saving them, then comparing them and saving the result. Dockumo does this, but makes it much faster to do. Also, there’s no need to store lots of messy versions on your computer since it’s stored in the cloud. Furthermore, I built in some functionality to export the result to a text file, html doc or word doc (word was by far the most challenging). Exporting actually keeps track of the changes in “Track changes” format in word, which I implemented through a modification to the node.js officegen module.

An example of an article
An example of an article

Here’s what the timeline view looks like:

Timeline view of an article
Timeline view of an article

Finally, Dockumo lets users group articles together in “Journals.” Journals are a way to get related content together and then to share them with your friends or other users on the site. When you share a journal, it becomes a “collaboration” for all users who are invited. Those collaborators can add new versions of any articles grouped in the journal. The idea behind this is that users will be able to view and edit data together, while keeping track of new versions. There’s an option, for instance, to receive a notification whenever a collaborator edits a series. So for example, if five users are collaborators on a journal who are working on a report or a term paper, each one can make his or her modifications and save as a new version. As the document evolves through time, each user can see which other user made the change and how the article changed overtime through the comparison view.

A journal
A journal

What’s left?

When I started out making Docukmo, I only intended it to be a quick and easy way to diff two blurbs or text. I didn’t want feature creep to set in, but it inevitably did. Journals were not planned from the beginning, but I figured users would want a way to see relevant content. I ended up having to add lots of other elements as well, such as “friending” other users, user search, a notifications system, exporting to other formats, and emailing users who want updates, user permissions. All of these things added complexity and time. A good lesson for me though: When I thought I was able to launch the website, I was actually still about a month out. It wasn’t until I tried to launch did I realize that there were so many niggling things left undone. For instance, not allowing users to register under the same username or email address, or giving a password reset mechanism. Fortunately, I can carry over a lot of this code to any future projects down the line.

And feature-creep. I am probably the worst-offender of this concept ever. I always think to myself, “well if the software doesn’t do [X], then users won’t want to use it!” For me, this is an ever present temptation to continue adding feature upon feature to the source while never actually launching. Since launching, there have been other features that I really want to add (and probably will). For instance, I want to give users the option to make articles “public” and tag them with keywords so that the community (all users within the cloud) can search for a particular type of document (e.g. a cover letter or a thank you note), see an example of a it, and suggest modifications and rate existing articles. In other words, have a community of contributors who persistently improve cloud-based, crowd-sourced documents. That’s why I called the product “Dockumo”, or “doc” + “kumo” (“cloud” in Japanese).

Let me know what you guys think. I’m always happy to discuss improvements I can make or my technical/architectural decisions.

About page
About page

Real Time Analytics Engine

My current project at work is to architect a solution to capturing network packet data sent over UDP, aggregate it, analyze it and report it back up to our end users. At least, that’s the vision. It’s a big undertaking and there are several avenues of approach that we can take. To that end, I’ve started prototyping to create a test harness to see the write performance/read performance of various software at different levels in our software stack.

Our criteria are as follows (taken from a slide I created):

Our Criteria
Our Criteria

Based on these four requirements pillars, I’ve narrowed down the platform choices to the following:

SERVER

nodejsStrengths

      • Single-threaded, event loop model
      • Callbacks executed when request made (async), can handle 1000s more req than tomcat
      • No locks, no function in node directly performs I/O
      • Great at handling lots of small dynamic requests
      • Logo is cool

Weaknesses

      • Taking advantage of multi-core CPUs
      • Starting processes via child_process.fork()
      • APIs still in development
      • Not so good at handling large buffers

logo-white-big

Strengths

        • Allows writing code as single-threaded (Uses Distributed Event Bus, distributed peer-to-peer messaging system)
        • Can write “verticles” in any language
        • Built on JVM and scales over multiple cores w/o needing to fork multiple servers
        • Can be embedded in existing Java apps (more easily)

Weaknesses

        • Independent project (very new), maybe not well supported?
        • Verticles allow blocking of the event loop, which will cause blocking in other verticles. Distribution of work product makes it more likely to happen.
        • Multiple languages = debugging hell?
        • Is there overhead in scaling over multiple nodes?

PERSISTENCE

For our data, we are considering several NoSQL databases. Our data integrity is not that important, because our data is not make-or-break for a company. However, it is essential to be highly performant, with upwards of 10-100k, fixed-format data writes per second but much fewer reads.

mongo

Architecture

        • Maps in memory space directly to disk
        • B-tree indexing guarantee logarithmic performance

Data Storage

        • “Documents” akin to objects
        • Uses BSON (binary JSON)
        • Fields are key-value pairs
        • “Collections” akin to tables
        • No conversion necessary from object in data to object in programming language

Data-Replication

        • Mongo handles sharding for you
        • Uses a primary and secondary hierarchy (primary receives all writes)
        • Automatic failover

Strengths

        • Document BSON structure is very flexible, intuitive and appropriate for certain types of data
        • Easily query-able using mongo’s query language
        • Built-in MapReduce and aggregation
        • BSON maps easily onto JSON, makes it easy to consume in front end for charting/analytics/etc
        • Seems hip

Weaknesses

        • Questionable scalability and write performance for high volume bursts
        • Balanced B-Tree indices maybe not best choice for our type of data (more columnar/row based on timestamp)
        • NoSQL but our devs are familiar with SQL
        • High disk space/memory usage

 

cassandra
Architecture

        • Peer to peer distributed system, Cassandra partitions for you
        • No master node, so you can read-write anywhere

Data Storage

        • Row-oriented
        • Keyspace akin to database
        • Column family akin to table
        • Rows make up column families

Data Replication

        • Uses a “gossip” protocol
        • Data written to a commit log, then to an in-memory database (memtable) then to disk using a sorted-strings table
        • Customizable by user (allows tunable data redundancy)

Strengths

        • Highly scalable, adding new nodes give linear performance increases
        • No single point of failure
        • Read-write from anywhere
        • No need for caching software (handled by the database cluster)
        • Tunable data consistency (depends on use case, but can enforce transaction)
        • Flexible schema
        • Data compression built in (no perf penalty)

Weaknesses

        • Data modeling is difficult
        • Another query language to learn
        • Best stuff is used by Facebook, but perhaps not released to the public?

 

467px-Redis_Logo.svg

            TBD

 

        FRONT-END/REST LAYER

For our REST Layer/Front-end, I’ve built apps in JQuery, Angular, PHP, JSP and hands-down my favorite is Angular. So that seems like the obvious choice here.

AngularJS-large

Strengths

      • No DOM Manipulation! Can’t say how valuable this is …
      • Built in testing framework
      • Intuitive organization of MVC architecture (directives, services, controllers, html bindings)
      • Built by Google, so trustworthy software

Weaknesses

      • Higher level than JQuery, so DOM manipulation is unadvised and more difficult to do
      • Fewer 3rd party packages released for angular
      • Who knows if it will be the winning javascript framework out there
      • Learning curve

Finally, for the REST API, I’ve also pretty much decided on express (if we go with node.js):

express

Strengths

      • Lightweight
      • Easy integration into node.js
      • Flexible routing and callback structure, authorization & middleware

Weaknesses

      • Yet another javascript package
      • Can’t think of any, really (compared to what we are using in our other app – java spring

These are my thoughts so far. In the following posts, I’ll begin describing how I ended up prototyping our real time analytics engine, creating a test harness, and providing modularity so that we can make design decisions without too much downtime.

Mapping Google Datatable JSON format to Sencha GXT ListStores for charting

By now, if you’ve been keeping up with my blog, you’ll notice that we make extensive use of charting in our application. We decided to go with Sencha GXT charting because we were under time pressure to finish our product for a release. However, we weren’t so crunched for time that I decided to model the data in an unintelligent way. Since charts essentially graphically display tabular data, I wanted to ensure that if we were to move to a different graphing package (which we are, btw) in the future, our data was set up for easy consumption.

After tooling around a bit, I decided on google datatable JSON formatting for our response data from queries made to our server. Google datatable formatting essentially just mimics tabular data in JSON formatting. It’s also easily consumable by google or angular charts. Having used angular charts in the past, I knew that the data format would map nicely to charts. However, the immediate problem still loomed, which is how to convert from google datatable JSON format to consumable ListStores for consumption by GXT charts?

Solution: In the end, I ended up writing an adapter with specific implementations for consuming our backend charting data. I needed to write a Sencha/GXT specific one. Here’s an example of the data we receive from our REST query.

//Note: some data omitted for readability
{
    "offset": 0,
    "totalCount": 1,
    "items": [
        {
            "title": "Registration Summary",
            "data": {
                "cols": [
                    {
                        "id": "NotRegistered",
                        "label": "Not Registered",
                        "type": "number"
                    },
                    {
                        "id": "Registered",
                        "label": "Registered",
                        "type": "number"
                    }
                ],
                "rows": [
                    {
                        "l": "9608",
                        "c": [
                            {
                                "v": "5",
                                "f": null
                            },
                            {
                                "v": "4",
                                "f": null
                            }
                        ]
                    },
                    {
                        "l": "9641",
                        "c": [
                            {
                                "v": "2",
                                "f": null
                            },
                            {
                                "v": "5",
                                "f": null
                            }
                        ]
                    },
                    {
                        "l": "9650SIP",
                        "c": [
                            {
                                "v": "3",
                                "f": null
                            },
                            {
                                "v": "0",
                                "f": null
                            }
                        ]
                    }
                ]
            }
        }
    ]
}

And here’s an example of how it’s converted into a Sencha ListStore:

[
    {
        v1=4,
        label0=NotRegistered,
        v0=5,
        label1=Registered,
        id1=Registered,
        type1=number,
        id0=NotRegistered,
        type0=number,
        f1=null,
        l=9608,
        f0=null
    },
    {
        v1=5,
        label0=NotRegistered,
        v0=2,
        label1=Registered,
        id1=Registered,
        type1=number,
        id0=NotRegistered,
        type0=number,
        f1=null,
        l=9641,
        f0=null
    },
    {
        v1=0,
        label0=NotRegistered,
        v0=3,
        label1=Registered,
        id1=Registered,
        type1=number,
        id0=NotRegistered,
        type0=number,
        f1=null,
        l=9650SIP,
        f0=null
    }
...
]

Step 1: Define the ChartModel I/F to map to the JSON datatable format.

Here’s an example of a ChartModel java object that maps to JSON object.

public abstract class ChartModel {
	
	private static ChartFactory factory = GWT.create(ChartFactory.class);
	private static JsonListReader<ChartObjects, ChartObject> reader = new JsonListReader<ChartObjects, ChartObject>(factory, ChartObjects.class );
	
	/**
	 * These constants are fixed per the value retrieved from JSON
	 */
	public static final String POINT = "point";
	public static final String COL_LABEL = "label";
	public static final String COL_TYPE = "type";
	public static final String COL_ID = "id";
	public static final String CELL_VALUE = "v";
	public static final String CELL_FORMATTED_VALUE = "f";
	public static final String ROW_LABEL = "l";
	
	/*
	 * Define the data model
	 */
	
	public interface ChartObjects extends ModelList<ChartObject>{}
	
	public interface ChartObject {
		String getTitle();
		ChartData getData();
	}
	
	public interface ChartData {
		List<ChartCol> getCols();	//represents # series
		List<ChartRow> getRows();	//an array of array of cells, i.e. all the data
	}
	
	public interface ChartCol {
		String getId();
		String getLabel();
		String getType();
	}
	
	public interface ChartRow {
		List<ChartCell> getC();	//an array of chart cells, one per column
		String getL();	//label for this group of chart cells (e.g. a timestamp)
	}
	
	public interface ChartCell {
		String getV();	//get value
		String getF();	//get formatted value
	}

	public abstract ListStore<DataPoint> getListStore(ChartObject chartObject) throws Exception;
...
}

You can see that I also added an abstract getListStore method that requires any consumer of the ChartObject to define how the ChartObject gets modeled to the ListStore.

Step 2. Create an Adapter that extends the ChartModel and implements the function getListStore.

Here’s an example of a StackedBarChart model (the models turned out to be different for multiple series in a stacked bar chart and for example, a pie chart. Line charts and stacked bar charts turned out to be the same.


public class StackedBarChartModel extends ChartModel {
	@Override
	public ListStore<DataPoint> getListStore(ChartObject chartObject) throws Exception {
		return new GXTChartModelAdapterImpl(chartObject).getSeries(ChartType.STACKED_BAR);
	}
}

You can see that this method instantiates a new adapter instance for GXT charts, passes in the chart object (modeled to the datatable JSON format) and returns the appropriate chart series (in this case a BarSeries GXT object).

Step 3. Map the chart object to a ListStore.

public class GXTChartModelAdapterImpl implements GXTChartModelAdapter {


	@Override
	public ListStore<DataPoint> getSeries(ChartType type) throws Exception {
		switch(type) {
			case LINE:
				return getLineSeriesListStore();
			case STACKED_BAR:
				return getStackedBarSeriesListStore();
			case BAR:
				return getLineSeriesListStore();
			case PIE:
				return getPieSeriesListStore();
			default:
				throw new Exception("Unknown data series type.");
		}
	}

/**
	 * Returns a ListStore for a BarSeries GXT object to add to a chart
	 * @return
	 */
	private ListStore<DataPoint> getStackedBarSeriesListStore() {
		ListStore<DataPoint> _listStore = new ListStore<DataPoint>(new ModelKeyProvider<DataPoint>() {
			@Override
			public String getKey(DataPoint item) {
				return String.valueOf(item.getKey());
			}
		});
		
		int numPoints = this.getNumPoints();  //The number of rows in the rows obj in ChartObject
		int numSeries = this.getNumSeries();  //The number of rows in the col obj in ChartObject
		
		for (int i = 0; i < numPoints; i++) {	
			//This key must be unique per DataPoint in the store
			DataPoint d = new DataPoint(indexStr(ChartModel.POINT,Random.nextInt()));
			
			for (int index = 0; index < numSeries; index++) {
				d.put(indexStr(ChartModel.COL_LABEL, index), chartObject.getData().getCols().get(index).getLabel());
				d.put(indexStr(ChartModel.COL_TYPE, index), chartObject.getData().getCols().get(index).getType());
				d.put(indexStr(ChartModel.COL_ID, index), chartObject.getData().getCols().get(index).getId());
			}
			
			ChartRow row = chartObject.getData().getRows().get(i);	//get the i-th point
			d.put(ChartModel.ROW_LABEL, row.getL());
			for (int j = 0; j < numSeries; j++) {
				if (row.getC().get(j) != null) {	//if ith-point is not blank
					d.put(indexStr(ChartModel.CELL_VALUE, j), row.getC().get(j).getV());
					d.put(indexStr(ChartModel.CELL_FORMATTED_VALUE, j), row.getC().get(j).getF());
				} else {	//otherwise, assume 0 value
					d.put(indexStr(ChartModel.CELL_VALUE, j), "0");
					d.put(indexStr(ChartModel.CELL_FORMATTED_VALUE, j), "");
				}
			}
			_listStore.add(d);
		}
		
		return _listStore;
	}
...
}

A few things to note here.

  • A DataPoint is basically just a map of keys and value pairs. Since the DataPoint object is specific to the ListStore, the implementation is in the interface GXTChartModelAdapter. To do this, I used the instructions here: https://www.sencha.com/blog/building-gxt-charts/
  • Next, you can see that for each bar I have (not each stack), I create a new DataPoint object that contains a the corresponding value of the specific row in the ChartObject.rows object. I know that each column corresponds to a stacked bar, so I increment the “v” value by 1 and use this as the key / value pair to the DataPoint.
  • Finally, because the number of columns defines the number of stacks I expect in each bar, if there isn’t a value (because our backend didn’t provide one), I assume the value is 0. This enforces that the v(n) in the first bar will correspond to v(n) in any subsequent bars.
  • I added an “l” value to each row in the ChartObject.rows obj which corresponds to a label for the entire stacked bar. I know that in the datatables JSON format you can specify the first row in each row obj to be a String and correspond to the label for each bar. However, it’s harder to enforce this type of thing in java and we expected numeric data back each time.


Step 4. Pass in the data into a Sencha GXT Chart!

Now the easy part. Essentially the data is transformed from the datatables format to a ListStore with each DataPoint key: v0 – vN corresponding to a stack in a bar.

Thus when instantiating the chart, this is all I had to do:

private BarSeries<DataPoint> createStackedBar() {

		BarSeries<DataPoint> series = new BarSeries<DataPoint>();
		series.setYAxisPosition(Position.LEFT);
		series.setColumn(true);
		series.setStacked(true);
		
		//numSeries is the number of bars you want stacked, it conforms to the chartObject.cols().length
		for (int i = 0; i < numSeries; i++) {
			MapValueProvider valueProvider = new MapValueProvider(ChartModel.CELL_VALUE + i);
			series.addYField(valueProvider);
		}
		
		return series;
	}

When I instantiate the BarSeries, I just pass in a key for each the number of stacked bars in my response data.

Finally, the finished product.

Screen Shot 2014-04-09 at 5.03.44 PM