Implementing a message queue on a web-application can often be a way to distribute load across an application, allowing work (the task itself) to be placed in a queue and processed independently.

Creating a Message Queue in PHP

This allows separate processes, often referred to as workers, to read the queue and process the task according to the contents of the message. After they've finished processing the task, the worker process or thread can relay a response further down the chain.

In this article, we're going to look at creating a queue manually using built-in functions of PHP. We'll later look into the Gearman package for PHP and RabbitMQ messaging. Though it's always fundamental to understand the concepts and logic behind a queue.

To implement our queue, we're going to use the Semaphore functions. Although they don't handle concurrent operations as nicely as other solutions, they handle all the overhead we'll need to get started.

The Message Queue Design

To develop our message queue in PHP, we need to split our logic into two main parts.

Firstly, we need some way to add new messages into the queue. And, secondly, we need to come up with our worker processes that'll go through the messages in the queue.

Creating a Queue in PHP

Setting up a message queue in PHP is as easy as coming up with an arbitary number and then accessing a handler for a System V message queue resource. To get started, we'll create a constant file that'll do this for us:

class Queue {
     * Stores our queue semaphore.
     * @var resource
    private static $queue = NULL;
     * getQueue: Returns the semaphore message resource.
     * @access public
    public static function getQueue() {
        # Some unique ID
        define('QUEUE_KEY', 12345);
        # Different type of actions
        define('QUEUE_TYPE_START', 1);
        define('QUEUE_TYPE_END', 2);
        # Setup the queue
        self::$queue = msg_get_queue(QUEUE_KEY);
        # Return the queue
        return self::$queue;

The 'Queue' class above will be our simple way to reference the queue regardless of which stage we are at. We're defining a constant arbitary integer that we'll use as the queue identifier and two integer values that we will use to reference the type of message in the queue.

Setting up Our Messages

Now we've got the Queue class functional, we need to come up with a simple data structure for each of our messages. When we pass the message into the queue, our object will be serialized so we can use any object we need and call any methods on that object at either end.

class Message {
     * @var private
    private $key = '';
    private $data = array();
     * Constructor: Pass over the data we need
    public function __construct($key, $data) {
        $this->key = $key;
        $this->data = $data;    
     * getKey: Returns the key
    public function getKey() {
        return $this->key;

Adding Messages to the Queue

To add messages into our queue, we'll need to put another method into the 'Queue' class that takes a key to identify the message and then an arbitary array of data that'll be passed into our 'Message' class.

 * addMessage: Given a key, store a new message into our queue.
 * @param $key string - Reference to the message (PK)
 * @param $data array - Some data to pass into the message
public static function addMessage($key, $data = array()) {
    # What to send
    $message = new Message($key, $data);
    # Try to send the message
    if(msg_send(self::$queue, QUEUE_TYPE_START, $message)) {
    } else {
        echo "Error adding to the queue";

Finally, Work Through the Message Queue

To work through our message queue, we need to put together a 'Worker' class that'll read each message, process the task and then run a completion method using the 'Message' object we read.

class Worker {
     * Store the semaphore queue handler.
     * @var resource
    private $queue = NULL;
     * Store an instance of the read Message
     * @var Message
    private $message = NULL;
     * Constructor: Setup our enviroment, load the queue and then
     * process the message.
    public function __construct() {
        # Get the queue
        $this->queue = Queue::getQueue();
        # Now process
    private function process() {
        $messageType = NULL;
        $messageMaxSize = 1024;
        # Loop over the queue
        while(msg_receive($this->queue, QUEUE_TYPE_START, $messageType, $messageMaxSize, $this->message)) {
            # We have the message, fire back
             $this->complete($messageType, $this->message);
            # Reset the message state
            $messageType = NULL;
            $this->message = NULL;
     * complete: Handle the message we read from the queue
     * @param $messageType int - The type we actually got, not what we desired
     * @param $message Message - The actual object
    private function complete($messageType, Message $message) {
        # Generic method
        echo $message->getKey();    

Pulling the Queue Together

Now we've got our Queue, Message and Worker classes all sorted out - we just need to pull it all into one. We're going to use the Slim Framework here:

$app = new SlimSlim();
 * Handle all POST to /create
 * @param key string 
$app->post('/create', function() {
    # Get the key
    $key = $_POST['key'];
    # Setup some data
    $data = array(
        'time' => time(),
        'key' => $key,
        'request' => 'start'
    # Add the message into the queue
    Queue::addMessage($key, $data);    
# Run the application

Now we've got a simple REST application that we can send POST requests to /create. To start our worker, we can setup a cron task that calls our new Worker class:

$worker = new Worker;

There we have it, we can add messages to our queue and watch the worker output the keys we add as it starts to process them.

Make your Instagram beautiful. Preview your images before you post them ›