As we start to scale out our analytics service here, we’ve started thinking of ways to leverage node’s very good event-driven model across multiple cores and eventually multiple machines. Node clustering is still in its nascent stages, so there’s not a whole lot of functionality and it relies on OS-level forking to “scale out.” While this works in principle, it means that clustering in node is not native per se and that node’s cluster module essentially just provides a weak layer of abstraction across several node processes that are effectively launched from your command line.
Although node already implements load balancing for HTTP requests in its cluster module, it doesn’t handle load balancing for other protocols. Also, node’s load balancing algorithm had a few quirks related to load balancing by relying on the OS’s scheme of waking up dormant processes. (see here http://strongloop.com/strongblog/whats-new-in-node-js-v0-12-cluster-round-robin-load-balancing/). Hence, in Node.js v0.11.2, the developers at Joyent decided to move back to a round-robin scheme. Round-robin is not terribly sophisticated and make no judgment as to the type of work a worker is currently processing. While it works well when each request is essentially equal in the amount of work required, it’s not so good when the incoming work to do varies in nature. At least it’s conceptually simple and simple to implement.
My first major stumbling block when using node’s cluster module was the fact that this load balancing wasn’t supported for UDP. Furthermore, we receive analytics data using a range of protocols (e.g. SFTP, TCP/IP, FTP, etc.) so I didn’t want to rely on node’s native libraries in case no one thought it important enough to extend load balancing to a variety of protocols. Thus, I had to come up with my own generalized architecture for implementing load balancing. Have a look:
var dgram = require('dgram') cluster = require('cluster'), WorkerPrototype = require('../workers/WorkerPrototype.js'), WorkerDownError = require('../exceptions/WorkerDownError.js'), net = require('net'); // Private Functions var protocols = { 'udp': { 'start': listen_udp, 'stop': shutdown_udp }, 'tcp_server': { 'start': open_tcp_server, 'stop': shutdown_tcp_server }, 'tcp_client': { 'start': open_tcp_client, 'stop': shutdown_tcp_client } }; function listen(callback) { protocols[this.protocol].start.call(this, callback); } function shutdown(callback) { protocols[this.protocol].stop.call(this, callback); } //This function will restart the server every retry interval if restart is enabled function restart(callback) { var self = this; console.log('[%s] Trying restart in %d ms ...', self.name, self.retrydelay); setTimeout(function() { self.retrydelay = Math.floor(self.retrydelay * 1.5); listen.call(self, callback); }, self.retrydelay ); } //UDP Handling function listen_udp(callback) { var self = this; var server = dgram.createSocket('udp4'); server.on('error', function (err) { console.log('[%s] Error:\n' + err.stack, self.name); try { server.close(); } catch (err) { if (err) { console.log('[%s] Error closing tcp connection: %s', self.name, err); } } finally { if (self.restart) { restart.call(self, callback); } else { //otherwise, just calls callback if (callback) callback(err); } } }); server.on('message', function (msg, rinfo) { console.log('[%s] Received a message from ' + rinfo.address + ':' + rinfo.port + ' @ [%s] of size [%d] bytes.', self.name, new Date(), msg.length); self.counter++; self.distribute.call(self, self.workers, { data: msg, rinfo: rinfo }); }); server.bind(self.port, function() { console.log('[%s] Server opening socket and listening on port ' + self.port, self.name); }); server.on('listening', function () { var address = server.address(); var msg = '[' + self.name + '] SUCCESS! UDP socket now listening @ ' + address.address + ':' + address.port; if (callback) { self.retrydelay = self.restartdelay; callback(null, msg); } }); self.server = server; return server; } function shutdown_udp(callback) { var self = this; if (self.server) { self.server.close(); var msg = '[' + self.name + '] Load Balancer shutdown successfully.'; if (callback) callback(null, msg); } else { var err = '[' + self.name + '] Load Balancer not defined! Could not be shutdown.'; callback(err); } } //TCP Client - creates connection to another TCP server function open_tcp_server(callback) { //implementation goes here } function shutdown_tcp_server(callback) { //implementation goes here } function open_tcp_client(callback) { //implementation goes here } function shutdown_tcp_client(callback) { //implementation goes here } //Private Load Balancer function instantiateWorkers() { for (var i = 0; i < this.instances; i++) { console.log('Instantiating Worker %s %d', this.worker_name, i); var worker = cluster.fork({ name: this.worker_name, id: i }); this.workers.push(worker); } } /** * Load Balancer is a class that wraps functionality for opening/closing a UDP or TCP connection * It also implements default load balancing based on the number of workers specified. * * @class LoadBalancer * @constructor * @param {String} name The name for this load balancer * @param {Number} port The port number * @param {String} protocol Either udp or tcp at the moment, can be extended later on * @param {String} worker_name Give each worker a name * @param {Number} instances The number of instances of the LoadBalancer to be "forked" although, each instance should only receive messages */ var LoadBalancer = function(name, port, protocol, worker_name, instances, retryonerror) { if (!port || !protocol) { throw new Error('Could not instantiate load balancer. Port/Protocol/Workers undefined.'); } if (instances <= 0) { throw new Error('Not enough worker instances for workers. Failed to start.'); } if (!protocols[protocol]) { throw new Error('Could not instantiate load balancer. Unknown protocol %s', protocol); } //Set by User this.name = name; this.port = port; this.protocol = protocol; this.worker_name = worker_name; this.instances = instances; this.restart = retryonerror ? retryonerror : false; //if true, will try to reopen connection upon error this.restartdelay = 5000; //will multiply by 1.5 each time and reset when connection is reestablished //Private members this.workers = []; this.server; this.counter = 0; this.retrydelay = this.restartdelay; var self=this; this.start = function(callback) { listen.call(self, function(err, msg) { self.counter = 0; //reset counter if (!err) instantiateWorkers.call(self); if (callback) { callback(err, msg); } }); } this.stop = function(callback) { shutdown.call(this, function(err, msg) { if (err) console.log(err); if (callback) callback(err, msg); }); } this.restart = function() { this.stop(function(err, msg) { if (err) { throw new Error(err); } else { this.start(); } }); } }; /** * Default implementation of distribute - sequential cycling through workers * * @method distribute * @param {Array[cluster.Worker]} workers The array of workers managed by this LoadBalancer. * @param {Object} data An object containing the data received on the port for this Load Balancer. Ex: UDP would send data {data: msg, rinfo: rinfo} */ LoadBalancer.prototype.distribute = function(workers, data) { var self = this; var worker = workers[self.counter % workers.length]; try { if (worker) { worker.send(data); } } catch (e) { if (e instanceof WorkerDownError) { console.log('Worker %s is down. Removing from workers for [%s:%d]', worker.name, self.name, self.instance); workers.splice(workers.indexOf(worker), 1); } else { throw e; } } }; module.exports = LoadBalancer;
Basically, this prototype does a few things:
- Implements generalized servers by protocol to receive data
You can see that when one instantiates or extends the load balancer prototype, the user will need to specify the protocol to use. Since things like ports and instances of workers are common to all load balancers, these are private attributes of the load balancer instance. Essentially, all servers under this model work in the same way. A master server is instantiated which receives all data over a socket. When the server receives a request or message, it will pass it off to one of N workers (specified by the user) which is running in its own process. That worker then does the work.
- Implements default methods to start, stop and restart servers
A user doesn’t need to concern herself about the starting, stopping and restarting of servers should a socket go down. That’s why there is some dummy logic for reopening a socket upon error (e.g. if the socket is flooded and closes). The default mechanism is to try reopening the socket after 5 seconds, which increases by 50% each time the socket fails to open.
- Can take an arbitrary number of “Workers” to handle the load
The load balancer wouldn’t be of much use if it handled the entire workload individually. This prototype (and extended versions of it) actually will instantiate itself N times. I found this to be a quirk of node where launching a separate process (via setting the cluster settings) instead of launching the same process N times prevented me from being able to actually communicate with the worker processes. For example, I would launch the load balancer and passed in four separate javascript worker processes which were separate javascript files. The main load balancer could not communicate with them, even when I passed in a reference to the cluster to each worker process. The only way I could get communication to work was if the forked processes were forked from the same node cluster. I thought that this behavior was really strange and made for some weird looking code (i.e. the default implementation of node clustering which contains a massive if statement in the same javascript file which represents not only the cluster but also all of its workers).
- Offers a generic round-robin load balancer irrespective of protocol
The distribute function bound to the load balancer prototype is called whenever the data is received by the master process. I added a default implementation so users wouldn’t have to implement their own method using the prototype out of the box, but it would be easy to overwrite. If you, as the user, wanted to implement a different type of load balancing, you would just need to bind a new distribute() function to the prototype and things would work out of the box.
The last implementation step for the user is to extend the load balancer and specify what the workers do whenever data is received across the wire. A default implementation might look like this:
var LoadBalancer = require('./LoadBalancerPrototype.js'), UDPWorker = require('../workers/UDPWorker.js'); var UDPLoadBalancer = function(name, port, worker_name, instances, retryonerror) { LoadBalancer.call(this, name, port, 'udp', worker_name, instances, retryonerror); console.log('Instantiating UDP Load Balancer on port %d with %d workers', port, instances); }; UDPLoadBalancer.prototype = Object.create(LoadBalancer.prototype); UDPLoadBalancer.prototype.name = 'UDP Load Balancer'; //CONFIG: To be set by the user, define the type of worker UDPLoadBalancer.prototype.worker = RTCPWorker; // Custom distribution function - sequential. // Overwrite this function to specify how you want to distribute the load. // parameters msg, rinfo change depending on the protocol of the Load Balancer // This is just an example - it is not strictly necessary // UDPLoadBalancer.prototype.distribute = function(workers, msg, rinfo) { // }; if (cluster.isMaster) { var port = 5005, instances = 2, retry = false; for (var i = 0; i < process.argv.length; i++) { if (process.argv[i] == '-p') { port = process.argv[i+1] ? process.argv[i+1] : port; } if (process.argv[i] == '-i') { instances = process.argv[i+1] ? process.argv[i+1] : instances; } if (process.argv[i] == '-r') { retry = true; } } var udpLoadBalancer = new UDPLoadBalancer('RTCP Balancer 1', port, RTCPWorker.prototype.name, instances, true); udpLoadBalancer.start(function(err, msg) { if (err) { console.log(UDPLoadBalancer.prototype.name + ' error starting up: ' + err); udpLoadBalancer.stop(function(err, msg) { console.log('UDP load balancer shut down: ' + err + ' ' + msg); }); } else { //successful } }); } else { var worker = new UDPLoadBalancer.prototype.worker(cluster.worker, cluster.worker.id); cluster.worker.on('online', function() { console.log('[%s:%d] is online.', worker.name, worker.instance); }) //msg is of format {data: udp_msg, rinfo: rinfo} cluster.worker.on('message', function(msg) { worker.doWork(msg.data, msg.rinfo); }); } module.exports = UDPLoadBalancer;
And here’s an example implementation of a worker that handles the load:
var WorkerPrototype = require('./WorkerPrototype.js'), WorkerDownError = require('../exceptions/WorkerDownError.js'), Decoder = require('../controllers/Decoder.js'); var UDPWorker = function(worker, instance) { WorkerPrototype.call(this, worker, RTCPWorker.prototype.name, instance); //implementations need to specify what work to do upon receiving a message this.doWork = function(msg, rinfo) { if (!this.worker || this.worker.suicide === true) { throw new WorkerDownError('[%s:%d] Worker is down. Not doing work.', this.name, this.instance); } console.log('[%s:%d] Doing work [%d].', this.name, this.instance, this.counter); } }; UDPWorker.prototype.name = 'RTCP Worker'; UDPWorker.prototype.getPath = function () { var fullPath = __filename; return fullPath; } module.exports = UDPWorker;
That’s pretty much all there is to it. You can see an example of messages being sent across the wire and how load is distributed for UDP:
Instantiating Worker UDP Worker 0 Instantiating Worker UDP Worker 1 Instantiating Worker UDP Worker 2 [UDP Balancer 1] Received a message from 1.1.0.1:65432 @ [Thu Sep 04 2014 14:27:35 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:2] Doing work [0]. [DECODER]: Decoding udp message with timestamp: 1409866055940. [UDP Balancer 1] Received a message from 1.1.0.2:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:3] Doing work [0]. [DECODER]: Decoding udp message with timestamp: 1409866056190. [UDP Balancer 1] Received a message from 1.1.0.3:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:1] Doing work [0]. [DECODER]: Decoding udp message with timestamp: 1409866056439. [UDP Balancer 1] Received a message from 1.1.0.4:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:2] Doing work [1]. [DECODER]: Decoding udp message with timestamp: 1409866056687. [UDP Balancer 1] Received a message from 1.1.0.5:65432 @ [Thu Sep 04 2014 14:27:36 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:3] Doing work [1]. [DECODER]: Decoding udp message with timestamp: 1409866056937. [UDP Balancer 1] Received a message from 1.1.0.6:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:1] Doing work [1]. [DECODER]: Decoding udp message with timestamp: 1409866057187. [UDP Balancer 1] Received a message from 1.2.0.1:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes. [UDP Worker:2] Doing work [2]. [DECODER]: Decoding udp message with timestamp: 1409866057214. [UDP Balancer 1] Received a message from 1.1.0.7:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:3] Doing work [2]. [DECODER]: Decoding udp message with timestamp: 1409866057437. [UDP Balancer 1] Received a message from 1.2.0.2:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes. [UDP Worker:1] Doing work [2]. [DECODER]: Decoding udp message with timestamp: 1409866057463. [UDP Balancer 1] Received a message from 1.1.0.8:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:2] Doing work [3]. [DECODER]: Decoding udp message with timestamp: 1409866057687. [UDP Balancer 1] Received a message from 1.2.0.3:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes. [UDP Worker:3] Doing work [3]. [DECODER]: Decoding udp message with timestamp: 1409866057714. [UDP Balancer 1] Received a message from 1.1.0.9:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:1] Doing work [3]. [DECODER]: Decoding udp message with timestamp: 1409866057938. [UDP Balancer 1] Received a message from 1.2.0.4:65432 @ [Thu Sep 04 2014 14:27:37 GMT-0700 (PDT)] of size [204] bytes. [UDP Worker:2] Doing work [4]. [DECODER]: Decoding udp message with timestamp: 1409866057965. [UDP Balancer 1] Received a message from 1.1.0.10:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:3] Doing work [4]. [DECODER]: Decoding udp message with timestamp: 1409866058188. [UDP Balancer 1] Received a message from 1.2.0.5:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes. [UDP Worker:1] Doing work [4]. [DECODER]: Decoding udp message with timestamp: 1409866058215. [UDP Balancer 1] Received a message from 1.1.0.11:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:2] Doing work [5]. [DECODER]: Decoding udp message with timestamp: 1409866058437. [UDP Balancer 1] Received a message from 1.2.0.6:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes. [UDP Worker:3] Doing work [5]. [DECODER]: Decoding udp message with timestamp: 1409866058465. [UDP Balancer 1] Received a message from 1.1.0.1:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:1] Doing work [5]. [DECODER]: Decoding udp message with timestamp: 1409866058655. [UDP Balancer 1] Received a message from 1.1.0.12:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [196] bytes. [UDP Worker:2] Doing work [6]. [DECODER]: Decoding udp message with timestamp: 1409866058689. [UDP Balancer 1] Received a message from 1.2.0.7:65432 @ [Thu Sep 04 2014 14:27:38 GMT-0700 (PDT)] of size [204] bytes. [UDP Worker:3] Doing work [6].
One thought on “Clustering in Node.js (Part 1) – Round-robin load balancing in a node cluster”