Clustering in Node.js (Part 2) – Round-robin load balancing in a node cluster – notifying all workers

Here is part two of the round-robin load balancing post, which explains how to notify all workers depending on the type of data. This actually is very simple to do in the default architecture as laid out in Part 1. This provides a lot of utility because it allows each worker to maintain a state based on the type or amount of work its completed, while the Load Balancer itself remains stateless. Essentially it works like this:

  1. Inspect the contents of the data. If the data warrants a notification to all workers, then return the type of notification to be sent. If it doesn’t, don’t send any notification
  2. If there is a notification to be sent to workers, send it to all workers before actually distributing the data.
  3. If a worker receives the notification, it will update its state before processing the data.

It’s really that simple. To implement this, I added a prototypal method to the LoadBalancer class I had written, called broadcast(data). Broadcast essentially inspects the data and determines what type of notification to send, if any, to the workers.

/**
 * This method if returns a notification type based on the data.  If the type
 * is not defined, then each worker will be notified by the type of notification
 * so the workers will need to implmement notify() to take advantage of the type
 *
 * @method broadcast
 * @param {Object} data Data is of format { data: msg, rinfo: rinfo }
 * @return {Object} Returns a NotificationPrototype or undefined to disable
 */
MyLoadBalancer.prototype.broadcast = function(data) {
    if (needsNotification(data)) {
        return NotificationType;
    } else {
        return undefined;
    }
}

Next, it was necessary to modify the LoadBalancer to send the notification first if there was one to be sent. Essentially, for each data source/server implemented in LoadBalancer, I added this:

function serverWrapper() { ...
    c.on('data', function(data) {
	  		console.log('[%s] Receiving TCP data', self.name);
	  		var data = {
	  			remote_address: c.remoteAddress,
	  			data: data
	  		};
	  		self.counter++;

	  		var notification = createNotification.call(self, self.broadcast(data), data);
			if (notification) {
		  		self.notify.call(self, self.workers, notification);	
		  	}
		  	self.distribute.call(self, self.workers, data);	
	  	});

    ...
}
/**
 * Default implementation of notify - notifies all workers about some data
 *
 * @method distribute
 * @param {Array[cluster.Worker]} workers The array of workers managed by this LoadBalancer.  
 * @param {Object} data An object containing the message to send to the workers
 */
LoadBalancer.prototype.notify = function(workers, notification) {
	var self = this;
	try {
		workers.forEach(function(worker) {
			worker.send({
				type: 'notification',
				data: notification
			});
		});
	} catch (e) {
		if (e instanceof WorkerDownError) {
			console.log('Worker %s is down.  Removing from workers for [%s:%d]', worker.name, self.name, self.instance);
			workers.splice(workers.indexOf(worker), 1);
		} else {
			throw e;
		}
	}
};

/**
* Instantiate a notification if there is one
* @method createNotification
* @param {Function} BroadcastType The notification type
* @param {Object} data The data associated with the receiver
*/
function createNotification(BroadcastType, data) {
	if (!BroadcastType) {
		return undefined;
	} else {
		return new BroadcastType(BroadcastType.prototype.type, this.name, data);
	}
}

This code is really simple. You can see that when the data event is thrown based on the server type, the prototype’s broadcast method is called to determine whether there needs to be a notification broadcast to each of the workers. If broadcast returns a notification type (‘BroadcastType’ here), then you’ll end up with an instance of the notification wrapping the data. Then for each worker, that notification is sent and the worker can determine what to do with the notification.
If there isn’t any notification (i.e. it’s undefined), then the LoadBalancer will just proceed as normal and distribute the work. In this code, notifications are really simple, essentially just plain data objects wrapped in a function in javascript. Here’s an example:

'use strict'

/**
* Notification class is a wrapper for a javascript object that gets passed upon some event triggered by a 
* data source consumed by a Load Balancer
*
* @class Notification
* @constructor
* @param {String} type A string of the type (similar to an enum)
* @param {String} source The name of the source instantiating this notification
* @param {Object} data The datagram bundled with this notification
*/
var Notification = function(type, source, data) {
	this.type = type;
	this.source = source;
	this.data = data;
	this.created = new Date().getTime();
};

//Rebind toString
Notification.prototype.stringify = function() {
	var str = '';
	str += 'Notification @: ' + this.created + ' from ' + this.source + ' of type ' + this.type;
	return str;
};

Notification.prototype.type = undefined;

module.exports = Notification;

//And in the worker class, implement this.notify
var Worker = function(worker, instance) { 
...
	this.notify = function(notification) {
		console.log('[%s:%d] Received notification from %s: %s', this.name, this.instance, notification.source, notification.type);
		switch(notification.type) {
			case 'Generic':
				break;
			case 'Example':
				break;
			default:
				break;
		}
	};
}

That’s pretty much all there is to it. Now when we receive data of a certain type, we can tell all workers to update a state variable. Neat!

[GenericLoadBalancer] Starting up Load Balancer on Port 5005 with 2 workers.
[GenericLoadBalancer] Server opening socket and listening on port 5005
[GenericLoadBalancer] Finished starting up.  Instantiating workers.
Instantiating Worker Generic Worker 0
Instantiating Worker Generic Worker 1
[GenericLoadBalancer] Received a message from 1.1.0.1:65432 @ [Fri Sep 12 2014 15:11:52 GMT-0700 (PDT)] of size [48] bytes.
[GenericLoadBalancer] Received a message from 1.2.0.2:65432 @ [Fri Sep 12 2014 15:11:52 GMT-0700 (PDT)] of size [48] bytes.
[Generic Worker:1] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:2] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:1] Received notification from GenericLoadBalancer: Generic - WAKE UP LAZY WORKERS!
[Generic Worker:2] Doing work [0].
[Generic Worker:1] Doing work [0].
Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s