Enable delayed messages in Zend_Queue
The default Zend_Queue DB implementation unfortunately does not allow you to pass a timeout value when saving a message on the queue. However not all is lost and you can easily extend the standard Zend classes to add that functionality.
All you need is your own Db adapter and Queue class.
For the adapter you only need to overwrite the send() function. The highlighted code below is the only change to the original class (2 lines affected…). You could actually apply this without extending Zend_Queue, but this way will be easier if you ever need to update your Zend library.
<?php
class TS_Queue_Adapter_Db extends Zend_Queue_Adapter_Db {
/**
* Send a message to the queue
*
* @param string $message Message to send to the active queue
* @param Zend_Queue $queue
* @param Timestamp $timeout
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception - database error
*/
public function send($message, Zend_Queue $queue = null, $timeout = null){
if ($this->_messageRow === null) {
$this->_messageRow = $this->_messageTable->createRow();
}
if ($queue === null) {
$queue = $this->_queue;
}
if (is_scalar($message)) {
$message = (string) $message;
}
if (is_string($message)) {
$message = trim($message);
}
if (!$this->isExists($queue->getName())) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
}
$msg = clone $this->_messageRow;
$msg->queue_id = $this->getQueueId($queue->getName());
$msg->created = time();
$msg->body = $message;
$msg->md5 = md5($message);
$msg->timeout = $timeout;
try {
$msg->save();
} catch (Exception $e) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
}
$options = array(
'queue' => $queue,
'data' => $msg->toArray(),
);
$classname = $queue->getMessageClass();
if (!class_exists($classname)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($classname);
}
return new $classname($options);
}
}
Your Queue class only needs to extend the send() function again to allow you to pass the timeout through. obviously you can use this class to add other functionality too.
<?php
class TS_Queue extends Zend_Queue {
/**
* Send a message to the queue
*
* @param mixed $message message
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception
*/
public function send($message, $timeout = null){
return $this->getAdapter()->send($message, null, $timeout);
}
}
You can leave a response, or trackback from your own site.

[...] This post was mentioned on Twitter by Joanna Chlasta, Joanna Chlasta. Joanna Chlasta said: New post: Delayed messages using Zend_Queue http://bit.ly/iknFGe #zf [...]
Doing this form of delayed messages has some significant disadvantages. You are going to run into quite a few issues with this approach as your queues grow larger.
If you look at how the db table selects data, you will find that the delay will cause more rows which means that query performance will slow down. Also the timeout is more or less utilized for flushing items out of the queue and not for a delay.
A better way to do this is to create a new table for scheduled messages and assign a timestamp to them of when they can be processed. Then using a RANGE query index you can partition as well as select out this data quickly.
Thanks for the comment Mike
I was going to have more than one queue table anyways, this was just a quick fix to see how hard would it be to add the delayed messages functionality.
A note on multiple tables, our scheduled message table looks something like this:
Zend/Queue/ScheduledMessage.php
- Basically has some getters and setters to enable our deliveryTime column
Zend/Queue/Adapter/Scheduled.php
-> Overrides the Db adapter to set our options and implement our send and receive.
Zend/Queue/Adapter/Scheduled/Message.php
- The database table
That generally will give you the base idea of what is needed to completely change out and add in new functionality.
Overall, it is a bit tedious but it does work… our only complaint is that Zend_Queue works more in batches than anything else which to us means less flexibility. We are actually going to be moving to beanstalkd which supports delays natively and works much like gearman does