My first Node App – Single file message queue

CreditorWatch connects to a government service and they have restricted us to a single concurrent connection via our frame relay connection, i.e. if we try to do two concurrent socket connections the second socket connect will not receive any data back. It is possible to use PHP, though this seemed like the wrong tool. Node offers the ability to run a continuous running bare basic server, avoiding Apache etc.

What we wanted to do is a little tricky, normally with a queue you send the queue worker a task and then continue on. In this case we send the queue the request and wait for the result. It works like this:

  • The requestor sends a request
  • The queue queues the request for processing
  • The queue moves through the queue
  • The queue returns the response to the requestor

The example below doesn’t do anything except return the sent message, in our production environment we send the message via Telnet and send back the response.

//require the net services
var net = require('net');
//queue for processing
var queue = new Array();
//make sure there is unique keys for each queue element
var used_keys = new Array();
//the finished items queue
var finished_queue = new Array();
//keep a count of requests
var request_count = 0;
//start the queue processor - below
process_queue();

//start a server for the other apps to connect to via a socket connection on port 1337
var server = net.createServer(function (socket) {
	//create a unique key
	var request_id='a'+(++request_count)+Math.ceil(Math.random()*3000);
	//check the key is not in use
	while(used_keys[request_id]!=undefined)
	{
		request_id='a'+(request_count)+Math.ceil(Math.random()*3000);
	}//if it is in use - somehow - create another
	//set the key as in use
	used_keys[request_id]=true;
	//log the details
	console.log('New Socket Conn');
	console.log('Request #' + request_count);
	//wait for the socket to send a message
	socket.on('data', function(data) {
		//log the data sent
		console.log('The Request: '+data.toString());
		//add the message to the back of the queue - bonus tip - you could set a priority option here
		queue.push({'req_id' : request_id, 'msg' : data.toString()});
		//call the watcher to see if there is a response - passing this socket details
		get_data(request_id,socket);
	});
	//log when the socket is closed
	socket.on('close', function(data) {
		console.log('Connection closed');
		console.log(process.memoryUsage());
	});
});
//init the listener
server.listen(1337, '127.0.0.1');
//the watcher function to check if there is a response
function get_data(req_id, socket)
{
	//the function we will loop back on - a while loop blocks
	function sub_get_data()
	{
		//is the queued item processed
		if(finished_queue[req_id]!=undefined)//yes
		{
			console.log('Have found request #' + req_id);
			//send the response to the requestor and close the connection
			socket.write(''+finished_queue[req_id]);
			socket.end();
			//garbage clean uo
			delete finished_queue[req_id];
			delete used_keys[req_id];
		}
		else//no
		{
			console.log('Havent found request #' + req_id);
			//in 20th of a second check again
			setTimeout(sub_get_data, 50);
		}
	}
	//init the watcher loop
	sub_get_data();
}

//the queue which keeps looping every 10th of a second to check if there is new items
function process_queue()
{
	//console.log('The queue is running n');
	//loop through the queue
	while(queue.length)
	{
		//not sure why i put in the if???
		if(queue.length)
		{
			console.log('There is ' + queue.length + ' items in the queue');
			//get the first item in the queue and remove it from the queue
			item=queue.shift();
			//process the item and add it to the finished queue for the get_data watcher to pick up
			finished_queue[item.req_id]=do_something_or_nothing(item.msg);
			console.log('The response for request #'+item.req_id+': '+finished_queue[item.req_id]);
			console.log('There is ' + finished_queue.length + ' items in the finished queue');
		}
	}
	//console.log('The queue is taking a break n');
	//take a break - 10th of second
	setTimeout(process_queue, 100)
}

function do_something_or_nothing(msg)
{
	return msg;//yeah we did nothing - thought you could do a lot
}

It is always good to test and test we did. The below script sends a random number to the Node app and checks that the random number returned is the same sent. We set up 20 terminal screens to run 100 continuous cURL calls to the below to confirm the queue is sending the response back to the requestor. We expect a max of 5 concurrent connections to this specialist service in the same second so 20 is a prefect test.

<?php
//random number
$send=rand(0, 99999);
//the connection settings
$port=1337;
$ip='127.0.0.1';
//open the socket to the node server
$fp = fsockopen($ip, $port,  $errno, $errstr, 5);
//send a message to the socket connection
fwrite($fp, $send);
$msg='start';
$content='';

//get the message sent back
$msg=fgets($fp);

//check the message returned is the message returned
if($msg==$send)//a match
{
	echo '
	We are happy
';
}
else//something is wrong
{
	echo '
	Oh ' .$send . ' - ' . $msg . ' didnt match
';//hopefully wont happen
}
//close the connection
fclose($fp);
Advertisements