My first Node App – Single file message queue

CreditorWatch connects to a government service and they have restricted us to a single concurrent connection via our frame relay connection, i.e. if we try to do two concurrent socket connections the second socket connect will not receive any data back. It is possible to use PHP, though this seemed like the wrong tool. Node offers the ability to run a continuous running bare basic server, avoiding Apache etc.

What we wanted to do is a little tricky, normally with a queue you send the queue worker a task and then continue on. In this case we send the queue the request and wait for the result. It works like this:

  • The requestor sends a request
  • The queue queues the request for processing
  • The queue moves through the queue
  • The queue returns the response to the requestor

The example below doesn’t do anything except return the sent message, in our production environment we send the message via Telnet and send back the response.

//require the net services
var net = require('net');
//queue for processing
var queue = new Array();
//make sure there is unique keys for each queue element
var used_keys = new Array();
//the finished items queue
var finished_queue = new Array();
//keep a count of requests
var request_count = 0;
//start the queue processor - below
process_queue();

//start a server for the other apps to connect to via a socket connection on port 1337
var server = net.createServer(function (socket) {
	//create a unique key
	var request_id='a'+(++request_count)+Math.ceil(Math.random()*3000);
	//check the key is not in use
	while(used_keys[request_id]!=undefined)
	{
		request_id='a'+(request_count)+Math.ceil(Math.random()*3000);
	}//if it is in use - somehow - create another
	//set the key as in use
	used_keys[request_id]=true;
	//log the details
	console.log('New Socket Conn');
	console.log('Request #' + request_count);
	//wait for the socket to send a message
	socket.on('data', function(data) {
		//log the data sent
		console.log('The Request: '+data.toString());
		//add the message to the back of the queue - bonus tip - you could set a priority option here
		queue.push({'req_id' : request_id, 'msg' : data.toString()});
		//call the watcher to see if there is a response - passing this socket details
		get_data(request_id,socket);
	});
	//log when the socket is closed
	socket.on('close', function(data) {
		console.log('Connection closed');
		console.log(process.memoryUsage());
	});
});
//init the listener
server.listen(1337, '127.0.0.1');
//the watcher function to check if there is a response
function get_data(req_id, socket)
{
	//the function we will loop back on - a while loop blocks
	function sub_get_data()
	{
		//is the queued item processed
		if(finished_queue[req_id]!=undefined)//yes
		{
			console.log('Have found request #' + req_id);
			//send the response to the requestor and close the connection
			socket.write(''+finished_queue[req_id]);
			socket.end();
			//garbage clean uo
			delete finished_queue[req_id];
			delete used_keys[req_id];
		}
		else//no
		{
			console.log('Havent found request #' + req_id);
			//in 20th of a second check again
			setTimeout(sub_get_data, 50);
		}
	}
	//init the watcher loop
	sub_get_data();
}

//the queue which keeps looping every 10th of a second to check if there is new items
function process_queue()
{
	//console.log('The queue is running n');
	//loop through the queue
	while(queue.length)
	{
		//not sure why i put in the if???
		if(queue.length)
		{
			console.log('There is ' + queue.length + ' items in the queue');
			//get the first item in the queue and remove it from the queue
			item=queue.shift();
			//process the item and add it to the finished queue for the get_data watcher to pick up
			finished_queue[item.req_id]=do_something_or_nothing(item.msg);
			console.log('The response for request #'+item.req_id+': '+finished_queue[item.req_id]);
			console.log('There is ' + finished_queue.length + ' items in the finished queue');
		}
	}
	//console.log('The queue is taking a break n');
	//take a break - 10th of second
	setTimeout(process_queue, 100)
}

function do_something_or_nothing(msg)
{
	return msg;//yeah we did nothing - thought you could do a lot
}

It is always good to test and test we did. The below script sends a random number to the Node app and checks that the random number returned is the same sent. We set up 20 terminal screens to run 100 continuous cURL calls to the below to confirm the queue is sending the response back to the requestor. We expect a max of 5 concurrent connections to this specialist service in the same second so 20 is a prefect test.

<?php
//random number
$send=rand(0, 99999);
//the connection settings
$port=1337;
$ip='127.0.0.1';
//open the socket to the node server
$fp = fsockopen($ip, $port,  $errno, $errstr, 5);
//send a message to the socket connection
fwrite($fp, $send);
$msg='start';
$content='';

//get the message sent back
$msg=fgets($fp);

//check the message returned is the message returned
if($msg==$send)//a match
{
	echo '
	We are happy
';
}
else//something is wrong
{
	echo '
	Oh ' .$send . ' - ' . $msg . ' didnt match
';//hopefully wont happen
}
//close the connection
fclose($fp);
Advertisements

One thought on “My first Node App – Single file message queue

  1. Cool concept! Definitely seems like a useful piece of code.

    I’m curious about why you didn’t just use an incrementing number for the id. The random number at the end means you might get duplicate ids. Also worth noting that JS arrays don’t handle string keys as well as PHP arrays (they don’t count towards .length for example). You might be better off using plain {} objects (but what you have works too, so maybe “if it ain’t broke, don’t fix it.”)

    I’m not quite sure how the ‘net’ module works (I only ever really use ‘http’), so I’m a noob on this. I know with ‘http’, the ‘data’ event might not be a full “message”. The data could be chunked if it gets very large. Does that happen with TCP requests too, or is it not an issue there?

    One cool thing about Node is it’s very heavily based on events, so the CPU only runs when there is an event to respond to. You might be able to get rid of the setTimeout loop in process_queue, and run the queue processing on process.nextTick() whenever a message comes in through the socket. You’d have to check there isn’t already processing queued up by keeping a flag around that you toggle on when data comes in and off when data is processed. The advantage would be you don’t run an idle loop every 50ms. Probably not an issue though at the current rate you’re running.

    Thanks a ton for posting! It was good reading this and being exposed to some TCP code. I love Node, so I look forward to more!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s