Clustering in Node.js (Part 3) – Basic flag parsing/loading for node.js processes

Using PM2’s API to start/stop/restart processes within our collector framework means we need way of passing in configuration data into each process so that each collector knows what to run and with what parameters. There are a few ways of doing this, but in my mind, the clearest and easiest way to do this was to just use command line parameters. It’s familiar to users, allows us to modify parameters on the fly when developing locally (i.e. without looking up and editing config files), and also gives us insight into the process when using os-level command line binaries like ps. Additionally, I knew going forward that our configs would change, though many of our collectors would reuse the same base configuration parameters, so I didn’t want to write a big switch statement with different logic for each inherited collector class. I also had no interest in going back into each class and modifying the same source code multiple times whenever a change was made. Thus, it made sense to me to create a separate config file for each type of collector we had which would extend the base configuration and would be able to launch and error check based on its own specific configuration.

In a nutshell, here’s the behavior I wanted:

  1. Ability to specify a default base configuration for launching a process
  2. Ability to modify the default base configuration easily (e.g. adding or removing config flags)
  3. Abstract out parsing details and logic so a process would always have everything it needs to run, even if a user failed to pass in all necessary config info
  4. Ability to extend the base configuration with different configs based on the type of collector process

To do this, I created a default configuration object with a map of keys corresponding to the flags, where the map contained the name of the key, the default value, and whether the flag was required or not. Using this object, I could iterate through the process.argv values passed into the node.js process to determine which parameters were passed in, if they matched the corresponding configuration file, and throw an error and not launch if there was some error in the config. I only added enough code to meet our needs, so there are some limitations to the configuration, such as reading multiple parameters passed in after a flag (currently, only allows 0 or 1 parameters afterwards).

Take a look:

'use strict'

var InvalidConfigError = require('../../exceptions/InvalidConfigError.js');

/**
* Load Balancer Config is a class that abstracts configuration loading for load balancers
*
* @class LoadBalancerConfig
* @constructor
* @param {Object} configs An array passed in through process.argv
*/
var LoadBalancerConfig = function(configs) {

	/** 
	 * Loads configuration based on the process.argv and fills in defaults where necessary
	 * Note, this will load the config set on the prototype scope
	 */
	function loadConfig(configs) {
		var self=this;

                /* 
                 * Just iterates over the process.argv configs and peeks at the next value if there is one
                 */
		configs.forEach(function(c, index) {
			if (self.config.hasOwnProperty(c)) {
				var peek = self.config[c].peekat;
				if (configs.length > index + peek) {
					self.config[c].value = peek > 0 ? configs[index+peek] : true;
				} else {
					throw new InvalidConfigError('Invalid configuration: ' + c + ' should have a parameter trailing it.')
				}

				if (Object.keys(self.config).indexOf(self.config[c].value) > -1) {
					throw new InvalidConfigError('Invalid configuration: ' + c + ' should have a parameter trailing it.')
				}
			}
		});

                /*
                 * If certain flags are left out, fill them with the default
                 */
		for (var key in self.config) {
			if (self.config[key].value == undefined && self.config[key].default !== undefined) {
				console.log('%s is undefined, so setting default to %s', key, self.config[key].default);
				self.config[key].value = self.config[key].default;
			}
		}
	}

	if (configs) {
		loadConfig.call(this, configs);
	}

	this.getConfig = function() {
		return this.config;
	}
};

//Default configuration
//keys are the argument flags
//peekat is the index to peek at for the value; if 0, then default should be false, so if flag is present, will defualt to true
LoadBalancerConfig.prototype.config = {
	'-p': {
		name: 'port',
		peekat: 1,
		default: 5005,
		required: true
	},
	'-i': {
		name: 'instances',
		peekat: 1,
		default: 1,
		required: true
	},
	'-retry': {
		name: 'retry',
		peekat: 0,
		default: false,
		required: false
	},
	'-name': {
		name: 'name',
		peekat: 1,
		default: 'Load_Balancer',
		required: false
	},
	'-protocol': {
		name: 'protocol',
		peekat: 1,
		required: true
	},
	'-workername': {
		name: 'workername',
		peekat: 1,
		required: false,
		default: 'Worker'
	}
};

/* 
 * @return {Object} config Returns a config object with all the defaults selected
 */ 
LoadBalancerConfig.prototype.getDefault = function() {
	for (var key in this.config) {
		this.config[key].value = this.config[key].default;
	}
	return this.config;
}

module.exports = LoadBalancerConfig;

It turns out that using this pattern allows for us to extend configs very easily. See here:


'use strict';

var LoadBalancerConfig = require('./LoadBalancerConfig.js');
/**
*
* @class NewLoadBalancerConfig
* @constructor
* @param {Object} configs An array passed in through process.argv
*/
var NewLoadBalancerConfig = function(config) {
	LoadBalancerConfig.call(this, config);
}

NewLoadBalancerConfig.prototype = Object.create(LoadBalancerConfig.prototype);
NewLoadBalancerConfig.prototype.config['-p'].default = 5005;
NewLoadBalancerConfig.prototype.config['-protocol'].default = 'udp';
NewLoadBalancerConfig.prototype.config['-name'].default = 'New_Load_Balancer';
NewLoadBalancerConfig.prototype.config['-workername'].default = 'Some Worker';

NewLoadBalancerConfig.prototype.config['-logdir'] = {
	name: 'log directory',
	peekat: 1,
	default: 'logs/',
	required: true
};

module.exports = NewLoadBalancerConfig;



///// Then in the load balancer class, do this:

	var config;
	try {
		config = (new NewLoadBalancerConfig(process.argv)).getConfig();
	} catch (err) {
		if (err instanceof InvalidConfigError) {
			console.log('There was a configuration error in starting up!');
			config = (new NewLoadBalancerConfig()).getDefault();
		} else {
			throw err;
		}
	}

Here you can see how easy it is to extend the base config to do things like replace the defaults or to add new flags that are required by the configuration type. All the options, error handling and loading of configs are kept track by config corresponding to the load balancer. I’ve already had to use this several times to add flags on the fly and it has saved tons of time by letting us not have to go back to each process and modify the logic to handle individual loading of flags. I love simple and easy wins like this.

Here’s an example log output:

-p is undefined, so setting default to 5005
-i is undefined, so setting default to 1
-retry is undefined, so setting default to false
-name is undefined, so setting default to New_Load_Balancer
-protocol is undefined, so setting default to udp
-workername is undefined, so setting default to Some Worker
-logdir is undefined, so setting default to logs/
{ '-p':
   { name: 'port',
     peekat: 1,
     default: 5005,
     required: true,
     value: 5005 },
  '-i':
   { name: 'instances',
     peekat: 1,
     default: 1,
     required: true,
     value: 1 },
  '-name':
   { name: 'name',
     peekat: 1,
     default: 'New_Load_Balancer',
     required: false,
     value: 'New_Load_Balancer' },
  '-protocol':
   { name: 'protocol',
     peekat: 1,
     required: true,
     default: 'udp',
     value: 'udp' },
  '-workername':
   { name: 'workername',
     peekat: 1,
     required: false,
     default: 'Some Worker',
     value: 'Some Worker' },
  '-logdir':
   { name: 'log directory',
     peekat: 1,
     required: true,
     default: 'logs/',
     value: 'logs/' } }

Clustering in Node.js (Part 2) – Round-robin load balancing in a node cluster – notifying all workers

Here is part two of the round-robin load balancing post, which explains how to notify all workers depending on the type of data. This actually is very simple to do in the default architecture as laid out in Part 1. This provides a lot of utility because it allows each worker to maintain a state based on the type or amount of work its completed, while the Load Balancer itself remains stateless. Essentially it works like this:

  1. Inspect the contents of the data. If the data warrants a notification to all workers, then return the type of notification to be sent. If it doesn’t, don’t send any notification
  2. If there is a notification to be sent to workers, send it to all workers before actually distributing the data.
  3. If a worker receives the notification, it will update its state before processing the data.

It’s really that simple. To implement this, I added a prototypal method to the LoadBalancer class I had written, called broadcast(data). Broadcast essentially inspects the data and determines what type of notification to send, if any, to the workers.

/**
 * This method if returns a notification type based on the data.  If the type
 * is not defined, then each worker will be notified by the type of notification
 * so the workers will need to implmement notify() to take advantage of the type
 *
 * @method broadcast
 * @param {Object} data Data is of format { data: msg, rinfo: rinfo }
 * @return {Object} Returns a NotificationPrototype or undefined to disable
 */
MyLoadBalancer.prototype.broadcast = function(data) {
    if (needsNotification(data)) {
        return NotificationType;
    } else {
        return undefined;
    }
}

Next, it was necessary to modify the LoadBalancer to send the notification first if there was one to be sent. Essentially, for each data source/server implemented in LoadBalancer, I added this:

function serverWrapper() { ...
    c.on('data', function(data) {
	  		console.log('[%s] Receiving TCP data', self.name);
	  		var data = {
	  			remote_address: c.remoteAddress,
	  			data: data
	  		};
	  		self.counter++;

	  		var notification = createNotification.call(self, self.broadcast(data), data);
			if (notification) {
		  		self.notify.call(self, self.workers, notification);	
		  	}
		  	self.distribute.call(self, self.workers, data);	
	  	});

    ...
}
/**
 * Default implementation of notify - notifies all workers about some data
 *
 * @method distribute
 * @param {Array[cluster.Worker]} workers The array of workers managed by this LoadBalancer.  
 * @param {Object} data An object containing the message to send to the workers
 */
LoadBalancer.prototype.notify = function(workers, notification) {
	var self = this;
	try {
		workers.forEach(function(worker) {
			worker.send({
				type: 'notification',
				data: notification
			});
		});
	} catch (e) {
		if (e instanceof WorkerDownError) {
			console.log('Worker %s is down.  Removing from workers for [%s:%d]', worker.name, self.name, self.instance);
			workers.splice(workers.indexOf(worker), 1);
		} else {
			throw e;
		}
	}
};

/**
* Instantiate a notification if there is one
* @method createNotification
* @param {Function} BroadcastType The notification type
* @param {Object} data The data associated with the receiver
*/
function createNotification(BroadcastType, data) {
	if (!BroadcastType) {
		return undefined;
	} else {
		return new BroadcastType(BroadcastType.prototype.type, this.name, data);
	}
}

This code is really simple. You can see that when the data event is thrown based on the server type, the prototype’s broadcast method is called to determine whether there needs to be a notification broadcast to each of the workers. If broadcast returns a notification type (‘BroadcastType’ here), then you’ll end up with an instance of the notification wrapping the data. Then for each worker, that notification is sent and the worker can determine what to do with the notification.
If there isn’t any notification (i.e. it’s undefined), then the LoadBalancer will just proceed as normal and distribute the work. In this code, notifications are really simple, essentially just plain data objects wrapped in a function in javascript. Here’s an example:

'use strict'

/**
* Notification class is a wrapper for a javascript object that gets passed upon some event triggered by a 
* data source consumed by a Load Balancer
*
* @class Notification
* @constructor
* @param {String} type A string of the type (similar to an enum)
* @param {String} source The name of the source instantiating this notification
* @param {Object} data The datagram bundled with this notification
*/
var Notification = function(type, source, data) {
	this.type = type;
	this.source = source;
	this.data = data;
	this.created = new Date().getTime();
};

//Rebind toString
Notification.prototype.stringify = function() {
	var str = '';
	str += 'Notification @: ' + this.created + ' from ' + this.source + ' of type ' + this.type;
	return str;
};

Notification.prototype.type = undefined;

module.exports = Notification;

//And in the worker class, implement this.notify
var Worker = function(worker, instance) { 
...
	this.notify = function(notification) {
		console.log('[%s:%d] Received notification from %s: %s', this.name, this.instance, notification.source, notification.type);
		switch(notification.type) {
			case 'Generic':
				break;
			case 'Example':
				break;
			default:
				break;
		}
	};
}

That’s pretty much all there is to it. Now when we receive data of a certain type, we can tell all workers to update a state variable. Neat!

[GenericLoadBalancer] Starting up Load Balancer on Port 5005 with 2 workers.
[GenericLoadBalancer] Server opening socket and listening on port 5005
[GenericLoadBalancer] Finished starting up.  Instantiating workers.
Instantiating Worker Generic Worker 0
Instantiating Worker Generic Worker 1
[GenericLoadBalancer] Received a message from 1.1.0.1:65432 @ [Fri Sep 12 2014 15:11:52 GMT-0700 (PDT)] of size [48] bytes.
[GenericLoadBalancer] Received a message from 1.2.0.2:65432 @ [Fri Sep 12 2014 15:11:52 GMT-0700 (PDT)] of size [48] bytes.
[Generic Worker:1] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:2] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:1] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:2] Doing work [0].
[Generic Worker:1] Doing work [0].

Clustering in Node.js (Part 1) – Round-robin load balancing in a node cluster

As we start to scale out our analytics service here, we’ve started thinking of ways to leverage node’s very good event-driven model across multiple cores and eventually multiple machines. Node clustering is still in its nascent stages, so there’s not a whole lot of functionality and it relies on OS-level forking to “scale out.” While this works in principle, it means that clustering in node is not native per se and that node’s cluster module essentially just provides a weak layer of abstraction across several node processes that are effectively launched from your command line.

Although node already implements load balancing for HTTP requests in its cluster module, it doesn’t handle load balancing for other protocols. Also, node’s load balancing algorithm had a few quirks related to load balancing by relying on the OS’s scheme of waking up dormant processes. (see here http://strongloop.com/strongblog/whats-new-in-node-js-v0-12-cluster-round-robin-load-balancing/). Hence, in Node.js v0.11.2, the developers at Joyent decided to move back to a round-robin scheme. Round-robin is not terribly sophisticated and make no judgment as to the type of work a worker is currently processing.  While it works well when each request is essentially equal in the amount of work required, it’s not so good when the incoming work to do varies in nature.  At least it’s conceptually simple and simple to implement.

My first major stumbling block when using node’s cluster module was the fact that this load balancing wasn’t supported for UDP.  Furthermore, we receive analytics data using a range of protocols (e.g. SFTP, TCP/IP, FTP, etc.) so I didn’t want to rely on node’s native libraries in case no one thought it important enough to extend load balancing to a variety of protocols. Thus, I had to come up with my own generalized architecture for implementing load balancing. Have a look:


var dgram = require('dgram')
	cluster = require('cluster'),
	WorkerPrototype = require('../workers/WorkerPrototype.js'),
	WorkerDownError = require('../exceptions/WorkerDownError.js'),
	net = require('net');

// Private Functions
var protocols = {
	'udp': {
		'start': listen_udp,
		'stop': shutdown_udp
	},
	'tcp_server': {
		'start': open_tcp_server,
		'stop': shutdown_tcp_server
	},
	'tcp_client': {
		'start': open_tcp_client,
		'stop': shutdown_tcp_client
	}
};

function listen(callback) {
	protocols[this.protocol].start.call(this, callback);
}

function shutdown(callback) {
	protocols[this.protocol].stop.call(this, callback);
}

//This function will restart the server every retry interval if restart is enabled
function restart(callback) {
	var self = this;
	console.log('[%s] Trying restart in %d ms ...', self.name, self.retrydelay);
	setTimeout(function() {
		self.retrydelay = Math.floor(self.retrydelay * 1.5);
		listen.call(self, callback);
	}, self.retrydelay );
}

//UDP Handling
function listen_udp(callback) {
	var self = this;
	var server = dgram.createSocket('udp4');
	server.on('error', function (err) {
        console.log('[%s] Error:\n' + err.stack, self.name);
        try {
      		server.close();
      	} catch (err) {
      		if (err) {
      			console.log('[%s] Error closing tcp connection: %s', self.name, err);
      		}
      	} finally {
      		if (self.restart) {
	        	restart.call(self, callback);
	       	} else {	//otherwise, just calls callback
	        	if (callback)
	        		callback(err);
	        }
      	}
    });

    server.on('message', function (msg, rinfo) {
		console.log('[%s] Received a message from ' + rinfo.address + ':' + rinfo.port + ' @ [%s] of size [%d] bytes.', self.name, new Date(), msg.length);
		self.counter++;
		self.distribute.call(self, self.workers, {
			data: msg, 
			rinfo: rinfo
		});
	});

	server.bind(self.port, function() {
	    console.log('[%s] Server opening socket and listening on port ' + self.port, self.name);
	});

	server.on('listening', function () {
	  	var address = server.address();
	  	var msg = '[' + self.name + ']  SUCCESS! UDP socket now listening @ ' +
	      	address.address + ':' + address.port;
	  	if (callback) {
	  		self.retrydelay = self.restartdelay;
	  		callback(null, msg);
	  	}
	});

	self.server = server;
	return server;
}

function shutdown_udp(callback) {
	var self = this;
	if (self.server) {
		self.server.close();
		var msg = '[' + self.name + '] Load Balancer shutdown successfully.';
		if (callback)
			callback(null, msg);
		
	} else {
		var err = '[' + self.name + '] Load Balancer not defined!  Could not be shutdown.';
		callback(err);
	}
}

//TCP Client - creates connection to another TCP server 
function open_tcp_server(callback) {
   //implementation goes here
}

function shutdown_tcp_server(callback) {
   //implementation goes here
}

function open_tcp_client(callback) {
   //implementation goes here
}

function shutdown_tcp_client(callback) {
   //implementation goes here
}

//Private Load Balancer
function instantiateWorkers() {
	for (var i = 0; i < this.instances; i++) {
		console.log('Instantiating Worker %s %d', this.worker_name, i);
		var worker = cluster.fork({
			name: this.worker_name,
			id: i
		});
	    this.workers.push(worker);
	}
}

/**
* Load Balancer is a class that wraps functionality for opening/closing a UDP or TCP connection
* It also implements default load balancing based on the number of workers specified.
*
* @class LoadBalancer
* @constructor
* @param {String} name The name for this load balancer
* @param {Number} port The port number
* @param {String} protocol Either udp or tcp at the moment, can be extended later on
* @param {String} worker_name Give each worker a name
* @param {Number} instances The number of instances of the LoadBalancer to be "forked" although, each instance should only receive messages
*/
var LoadBalancer = function(name, port, protocol, worker_name, instances, retryonerror) {

	if (!port || !protocol) {
		throw new Error('Could not instantiate load balancer.  Port/Protocol/Workers undefined.');
	}

	if (instances <= 0) {
		throw new Error('Not enough worker instances for workers.  Failed to start.');
	}

	if (!protocols[protocol]) {
		throw new Error('Could not instantiate load balancer.  Unknown protocol %s', protocol);
	}

	//Set by User
	this.name = name;
	this.port = port;
	this.protocol = protocol;
	this.worker_name = worker_name;
	this.instances = instances;
	this.restart = retryonerror ? retryonerror : false;	//if true, will try to reopen connection upon error
	this.restartdelay = 5000;	//will multiply by 1.5 each time and reset when connection is reestablished

	//Private members
	this.workers = [];
	this.server;
	this.counter = 0;
	this.retrydelay = this.restartdelay;

	var self=this;

	this.start = function(callback) {
		listen.call(self, function(err, msg) {
			self.counter = 0;	//reset counter
			if (!err)
				instantiateWorkers.call(self);

			if (callback) {
				callback(err, msg);
			}
		});
	}

	this.stop = function(callback) {
		shutdown.call(this, function(err, msg) {
			if (err) 
				console.log(err);
			if (callback)
				callback(err, msg);
		});
	}

	this.restart = function() {
		this.stop(function(err, msg) {
			if (err) {
				throw new Error(err);
			} else {
				this.start();
			}
		});
	}
};

/**
 * Default implementation of distribute - sequential cycling through workers
 *
 * @method distribute
 * @param {Array[cluster.Worker]} workers The array of workers managed by this LoadBalancer.  
 * @param {Object} data An object containing the data received on the port for this Load Balancer.  Ex:  UDP would send data {data: msg, rinfo: rinfo}
 */
LoadBalancer.prototype.distribute = function(workers, data) {
	var self = this;
	var worker = workers[self.counter % workers.length];
	try {
		if (worker) {
			worker.send(data);
		}
	} catch (e) {
		if (e instanceof WorkerDownError) {
			console.log('Worker %s is down.  Removing from workers for [%s:%d]', worker.name, self.name, self.instance);
			workers.splice(workers.indexOf(worker), 1);
		} else {
			throw e;
		}
	}
};

module.exports = LoadBalancer;

Basically, this prototype does a few things:

    1. Implements generalized servers by protocol to receive data

You can see that when one instantiates or extends the load balancer prototype, the user will need to specify the protocol to use. Since things like ports and instances of workers are common to all load balancers, these are private attributes of the load balancer instance. Essentially, all servers under this model work in the same way. A master server is instantiated which receives all data over a socket. When the server receives a request or message, it will pass it off to one of N workers (specified by the user) which is running in its own process. That worker then does the work.

    1. Implements default methods to start, stop and restart servers

A user doesn’t need to concern herself about the starting, stopping and restarting of servers should a socket go down. That’s why there is some dummy logic for reopening a socket upon error (e.g. if the socket is flooded and closes). The default mechanism is to try reopening the socket after 5 seconds, which increases by 50% each time the socket fails to open.

    1. Can take an arbitrary number of “Workers” to handle the load

The load balancer wouldn’t be of much use if it handled the entire workload individually. This prototype (and extended versions of it) actually will instantiate itself N times. I found this to be a quirk of node where launching a separate process (via setting the cluster settings) instead of launching the same process N times prevented me from being able to actually communicate with the worker processes. For example, I would launch the load balancer and passed in four separate javascript worker processes which were separate javascript files. The main load balancer could not communicate with them, even when I passed in a reference to the cluster to each worker process. The only way I could get communication to work was if the forked processes were forked from the same node cluster. I thought that this behavior was really strange and made for some weird looking code (i.e. the default implementation of node clustering which contains a massive if statement in the same javascript file which represents not only the cluster but also all of its workers).

    1. Offers a generic round-robin load balancer irrespective of protocol

The distribute function bound to the load balancer prototype is called whenever the data is received by the master process. I added a default implementation so users wouldn’t have to implement their own method using the prototype out of the box, but it would be easy to overwrite. If you, as the user, wanted to implement a different type of load balancing, you would just need to bind a new distribute() function to the prototype and things would work out of the box.

The last implementation step for the user is to extend the load balancer and specify what the workers do whenever data is received across the wire. A default implementation might look like this:

var LoadBalancer = require('./LoadBalancerPrototype.js'),
	UDPWorker = require('../workers/UDPWorker.js');

var UDPLoadBalancer = function(name, port, worker_name, instances, retryonerror) {
	LoadBalancer.call(this, name, port, 'udp', worker_name, instances, retryonerror);

	console.log('Instantiating UDP Load Balancer on port %d with %d workers', port, instances);
};

UDPLoadBalancer.prototype = Object.create(LoadBalancer.prototype);
UDPLoadBalancer.prototype.name = 'UDP Load Balancer';
//CONFIG: To be set by the user, define the type of worker
UDPLoadBalancer.prototype.worker = RTCPWorker;


// Custom distribution function - sequential.
// Overwrite this function to specify how you want to distribute the load.
// parameters msg, rinfo change depending on the protocol of the Load Balancer
// This is just an example - it is not strictly necessary

// UDPLoadBalancer.prototype.distribute = function(workers, msg, rinfo) {
// };


if (cluster.isMaster) {
	var port = 5005, instances = 2, retry = false;
	for (var i = 0; i < process.argv.length; i++) {
		if (process.argv[i] == '-p') {
			port = process.argv[i+1] ? process.argv[i+1] : port;
		}

		if (process.argv[i] == '-i') {
			instances = process.argv[i+1] ? process.argv[i+1] : instances;
		}

		if (process.argv[i] == '-r') {
			retry = true;
		}
	}

	var udpLoadBalancer = new UDPLoadBalancer('RTCP Balancer 1', port, RTCPWorker.prototype.name, instances, true);
	udpLoadBalancer.start(function(err, msg) {
		if (err) {
			console.log(UDPLoadBalancer.prototype.name + ' error starting up: ' + err);
			udpLoadBalancer.stop(function(err, msg) {
				console.log('UDP load balancer shut down: ' + err + '   ' + msg);
			});
		} else {
			//successful
		}
	});

} else {
	var worker = new UDPLoadBalancer.prototype.worker(cluster.worker, cluster.worker.id);

	cluster.worker.on('online', function() {
		console.log('[%s:%d] is online.', worker.name, worker.instance);
	})
	//msg is of format {data: udp_msg, rinfo: rinfo}
	cluster.worker.on('message', function(msg) {
		worker.doWork(msg.data, msg.rinfo);
	});
}

module.exports = UDPLoadBalancer;

And here’s an example implementation of a worker that handles the load:

var WorkerPrototype = require('./WorkerPrototype.js'),
	WorkerDownError = require('../exceptions/WorkerDownError.js'),
	Decoder = require('../controllers/Decoder.js');

var UDPWorker = function(worker, instance) {
	WorkerPrototype.call(this, worker, RTCPWorker.prototype.name, instance);

        //implementations need to specify what work to do upon receiving a message
	this.doWork = function(msg, rinfo) {
		if (!this.worker || this.worker.suicide === true) {
			throw new WorkerDownError('[%s:%d] Worker is down. Not doing work.', this.name, this.instance);
		}
		console.log('[%s:%d] Doing work [%d].', this.name, this.instance, this.counter);
	}
};

UDPWorker.prototype.name = 'RTCP Worker';
UDPWorker.prototype.getPath = function () { 
	var fullPath = __filename; 
	return fullPath; 
}

module.exports = UDPWorker;

That’s pretty much all there is to it. You can see an example of messages being sent across the wire and how load is distributed for UDP:

Instantiating Worker UDP Worker 0
Instantiating Worker UDP Worker 1
Instantiating Worker UDP Worker 2
[UDP Balancer 1] Received a message from 1.1.0.1:65432 @ [Thu Sep 04 2014 14:27:35 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866055940.
[UDP Balancer 1] Received a message from 1.1.0.2:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866056190.
[UDP Balancer 1] Received a message from 1.1.0.3:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [0].
[DECODER]: Decoding udp message with timestamp: 1409866056439.
[UDP Balancer 1] Received a message from 1.1.0.4:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866056687.
[UDP Balancer 1] Received a message from 1.1.0.5:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866056937.
[UDP Balancer 1] Received a message from 1.1.0.6:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [1].
[DECODER]: Decoding udp message with timestamp: 1409866057187.
[UDP Balancer 1] Received a message from 1.2.0.1:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:2] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057214.
[UDP Balancer 1] Received a message from 1.1.0.7:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057437.
[UDP Balancer 1] Received a message from 1.2.0.2:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:1] Doing work [2].
[DECODER]: Decoding udp message with timestamp: 1409866057463.
[UDP Balancer 1] Received a message from 1.1.0.8:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057687.
[UDP Balancer 1] Received a message from 1.2.0.3:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057714.
[UDP Balancer 1] Received a message from 1.1.0.9:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [3].
[DECODER]: Decoding udp message with timestamp: 1409866057938.
[UDP Balancer 1] Received a message from 1.2.0.4:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:2] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866057965.
[UDP Balancer 1] Received a message from 1.1.0.10:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:3] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866058188.
[UDP Balancer 1] Received a message from 1.2.0.5:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:1] Doing work [4].
[DECODER]: Decoding udp message with timestamp: 1409866058215.
[UDP Balancer 1] Received a message from 1.1.0.11:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058437.
[UDP Balancer 1] Received a message from 1.2.0.6:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058465.
[UDP Balancer 1] Received a message from 1.1.0.1:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:1] Doing work [5].
[DECODER]: Decoding udp message with timestamp: 1409866058655.
[UDP Balancer 1] Received a message from 1.1.0.12:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes.
[UDP Worker:2] Doing work [6].
[DECODER]: Decoding udp message with timestamp: 1409866058689.
[UDP Balancer 1] Received a message from 1.2.0.7:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes.
[UDP Worker:3] Doing work [6].