Working with PHP and Beanstalkd

I have just introduced Beanstalkd into my current PHP project; it was super-easy so I thought I'd share some examples and my thoughts on how a job queue fits in with a PHP web application.

The Scenario

I have an API backend and a web frontend on this project (there may be apps later. It's a startup, there could be anything later). Both front and back ends are PHP Slim Framework applications, and there's a sort of JSON-RPC going on in between the two.

The job queue will handle a few things we don't want to do in real time on the application, such as:

  • updating counts of things like comments; when a comment is made, a job gets created and we can return to the user. At some point the job will get processed updating the counts of how many comments are on that thing, how many comments the user made, adding to a news feed of activities ... you get the idea.
  • cleaning up; we have had a few cron jobs running to clean up old data but now those cron jobs put jobs into beanstalkd which gives us a bit more visibility and control of them, and also means that those big jobs aren't running on the web servers (we have a separate worker server)
  • other periodic things like updating incoming data/content feeds or talking to some of the 3rd party APIs we use like Mailchimp and Bit.ly

Adding Jobs to the Queue

There are two ends to this process, let's start by adding jobs to the queue. Anything you don't want to make a user wait for is a good candidate for a job. As I mentioned, some of our jobs get handled periodically with cron creating jobs, but since they are just beanstalkd jobs I can easily give an admin interface to trigger them manually also. In this case, I'm just making a job to process things we update when a user makes a comment.

A good job is very self-contained; a bit like a stateless web request it should contain anything that is needed to process it and not rely on anything that went before. On a live platform you would typically have many workers all consuming jobs from a single queue so there are no guarantees that one job will be completed before the next one begins to be processed! You can put any data you like into a job; you could send all the data fields to fill in and send an email template for example.

In this example I need to talk to the database anyway so I'm just storing information about which task should be done and including the comment ID with it.

I'm using an excellent library called Pheanstalk which is well-documented and available via Composer. The lines I added to my composer.json:

  "require": {
    "pda/pheanstalk": "2.1.0",
  }

I start by creating an object which connects to the job server and allows me to put jobs on the queue:

    new Pheanstalk_Pheanstalk(
        $config['beanstalkd']['host'] . ":" . $config['beanstalkd']['port']
    )

The config settings there will change between platforms but for my development version of this project, beanstalkd is just running on my laptop so my settings are the defaults:

[beanstalkd]
host=127.0.0.1
port=11300

Once you have the object created, $queue in my example, we can easily add jobs with the put() command - but first you specify which "tube" to use. The tubes would be queues in another tool, just a way of putting jobs into different areas, and it is possible to ask the workers to listen on specific tubes so you can have specialised workers if needed. Beanstalkd also supports adding jobs with different priorities.

Here's adding the simple job to the queue; the data is just a string so I'm using json_encode to wrap up a couple of fields:

  $job = array("action" => "comment_added",
    "data" => array("comment_id" => $comment_id));
  $queue->useTube('mytube')->put(json_encode($job));

I wrote a bit in a previous post about how to check the current number of jobs on beanstalkd, so you can use those instructions to check that you have jobs stacking up. To use those, we'll need to write a worker.

Taking Jobs Off The Queue

The main application and the worker scripts don't need to be in the same technology stack since beanstalkd is very lightweight and technology agnostic. I'm working with an entirely PHP team though so both the application and the workers are PHP in this instance. The workers are simply command-line PHP scripts that run for a long time, picking up jobs when they become available.

For my workers I have added the Pheanstalk libraries via Composer again and then my basic worker script looks like this:

require("vendor/autoload.php");

$queue =  new Pheanstalk_Pheanstalk($config['beanstalkd']['host'] . ":" . $config['beanstalkd']['port']);

$worker = new Worker($config);

// Set which queues to bind to
$queue->watch("mytube");

// pick a job and process it
while($job = $queue->reserve()) {
    $received = json_decode($job->getData(), true);
    $action   = $received['action'];
    if(isset($received['data'])) {
        $data = $received['data'];
    } else {
        $data = array();
    }

    echo "Received a $action (" . current($data) . ") ...";
    if(method_exists($worker, $action)) {
        $outcome = $worker->$action($data);

        // how did it go?
        if($outcome) {
            echo "done \n";
            $queue->delete($job);
        } else {
            echo "failed \n";
            $queue->bury($job);
        }
    } else {
        echo "action not found\n";
        $queue->bury($job);
    }

}

Here you can see the Pheanstalk object again, but this time we use some different commands:

  • reserve() picks up a job from the queue and marks it as reserved so that no other workers will pick it up
  • delete() removes the job from the queue when it has been successfully completed
  • bury() marks the job as terminally failed and no workers will restart it.

The other alternative outcome is to return without a specific status - this will cause the job to be retried again later.

Once one job has been processed, the worker will pick up another, and so on. With multiple workers running, they will all just pick up jobs in turn until the queue is empty again.

The Worker class really doesn't have much that is beanstalkd-specific. The constructor connects to MySQL and also instantiates a Guzzle client which is used to hit the backend API of the application for the tasks where all the application framework and config is really needed to perform the task - we create endpoints for those and the worker has an access token so it can make the requests. Here's a snippet from the Worker class:

class Worker
{
    protected $config;
    protected $db;
    protected $client;

    public function __construct($config) {
        $this->config = $config;
        // connect to mysql
        $dsn = 'mysql:host=' . $config['db']['host'] . ';dbname=' . $config['db']['database'];
        $username = $config['db']['username'];
        $password = $config['db']['password'];
        $this->db = new \PDO($dsn, $username, $password,
            array(PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES utf8"));

        $this->client = new \Guzzle\Http\Client($config['api']['url']);
    }

    public function comment_added($data) {
        $comment_sql = "select * from comments where comment_id = :comment_id";
        $comment_stmt = $this->db->prepare($comment_sql);
        $comment_stmt->execute(array("comment_id" => $data['comment_id']));
        $comment = $comment_stmt->fetch(PDO::FETCH_ASSOC);

        if($comment) {
            // more SQL to update various counts
        }
        return true;
    }

There are various different tasks here that call out to either our own API backend, or to MySQL as shown here, or to something else.

Other Things You Should Probably Know

Working with workers leads me to often do either one of these:

  1. forget to start the worker and then wonder why nothing is working
  2. forget to restart the worker when I deploy new code and then wonder why nothing is working

Beanstalkd doesn't really have access control so you will want to lock down what can talk to your server on the port it listens on. It's a deliberately lightweight protocol and I like it, but do double check that it isn't open to the internet or something!

Long-running PHP scripts aren't the most robust thing in the world. I recommend running then under the tender loving care of supervisord (which I wrote about previously) - this has the added advantage of a really easy way to restart your workers and good logging. You should probably also include a lot more error handling than I have in the scripts here; I abbreviated to keep things readable.

What did I miss? If you're working with Beanstalkd and PHP and there's something I should have mentioned, please share it in the comments. This was my first beanstalkd implementation but I think it's the first of many - it was super-easy to get started!

26 thoughts on “Working with PHP and Beanstalkd

  1. Hi Lorna,

    Something to remember that is often overlooked and worth pointing out to the security concious is that daemons like beanstalkd run in an insecure fashion. Even though the daemon is bound to 127.0.0.1 in your example, any minor compromise of the host would allow anything to be popped in and out of the queue without any privilege escalation needed.

    You should use the owner module in iptables (or similar depending on your environment) to prevent any processes not owned by your application specific php5-fpm user from being able to connect to that port.

    In a distributed architecture this is even more important, because you have to configure the firewall of the beanstalkd host and each client that's allowed to connect to the host.

    Apologies if this is too deep and not applicable!

    Steve

    • Thanks Steve! I did try to mention the security risks but your explanation is comprehensive so thanks for chiming in. I don't think it's ever possible to have too much information about possible security issues.

  2. Small typo :
    [code]$queue->useTube('mytube')->put(json_encode($data));[/code]
    should say
    [code]$queue->useTube('mytube')->put(json_encode($job));[/code]
    i believe.

  3. Pingback: PHPDeveloper.org: Lorna Mitchell: Working with PHP and Beanstalkd

  4. I played around with beanstalkd a lot and I can't recommend it anymore, if you only have a small amount of task, it's fine but as soon you want to do more with it, the performance will bite you in the neck. Beside that, a lot of people already using a redis to cache things and you could easily implement this task queue with an redis backend too. With the side effect that you have a lot more performance reserves left...

    Don't worry if you never plan to add more than 1000 tasks per second to your queue...

      • Thanks for sharing your production-platform experiences with beanstalkd, it's so useful to have some real experience. My application isn't under a lot of load yet but I have made a note to come back and add to this thread in time. I think even with the more pessimistic set of numbers, 1000 jobs per second (I only create jobs on write, and not all writes) is probably plenty of capacity for the vast majority of PHP application.

      • It does not matter if your modern server has an SSD or not, beanstalkd is MEMORY/CPU bound but not DISK bound if you don't need any persistence. So you had 25k/sec how big was your avg. message? I saw a somewhat hard barrier at about 20MB/s traffic for about 10K messages with avg. size of 2KB. With redis I was able to increase the performance by a factor of 10 on the same hardware.

  5. Pingback: Working with PHP and Beanstalkd | Advanced PHP ...

  6. Pingback: Programowanie w PHP » Blog Archive » Lorna Mitchell: Working with PHP and Beanstalkd

  7. Hi Lorna,

    Have you thought about the strong coupling of jobs and deployment revision/version? Say you update your worker class such that a "Job" from a previous revision which was perfectly valid is now incompatible with your changes. To solve this one would probably want to have workers running for both versions until all Jobs from the old revision have been processed.

    To achieve this your framework could then transparently append a deployment revision to all queue/tube names. Looking at the beanstalk protocol your worker manager program could then use the "list-tubes" to spawn workers of a specific type and revision to "mop up" jobs from a previous revision.

    This all sounds like quite a lot of work for something I assume somebody has already looked into, but looking online I couldn't find anything.

    • I don't think I've thought about this as much as you have :) The queues on my application are basically always empty apart from specific under-load times, so it would be unlikely (but never impossible) that a queued job could get dropped. To solve the problem you describe I think I'd want to do a 2-step deploy, one step to start creating the new job type and introduce worker functionality to handle it, and a second deploy to remove the old worker functionality once we're sure the queue is cleared of any of those old jobs - which is basically what I think you're describing. Thanks for the comment!

      • Your approach sounds extremely reasonable, and is the somewhat equivalent to the versioning that the maintainers of public APIs are forced to perform when making major upgrades. In this situation you (the maintainer) are in total control of the caller and called so you feel a more sophisticated solution would be possible.

        In a situation where you are directly changing the message format for the queue it would be expected that you'd realize that changes to both the producer/consumer would be required. Considering a separate situation where instead you add extra fields to a class, you may not even initially think to look at an async task that serializes/json_encodes the class. If your consumer expects the fields to be present (or calls any method on the class which does) when dealing with an "old" message it MAY give unexpected results which are hard to predict, and something which normal testing wouldn't pick up on.

        As you say there is only a small probability of such an event occurring. However if you find yourself in a situation where the occurrence of something slow happening is so important for the user that they would prefer it to be working correctly than super fast, then you may be better off keeping it called synchronously.

  8. Thanks for the nice write up! I managed to install beanstalkd on my server and I'm wondering how to display (all) jobs from a specific tube. I know that AWS SQS supports it and would know how to implement it, but I haven't found the obvious for beanstalkd yet. Any pointer in the right direction would be very helpful.

  9. Hi Lorna,
    Thanks for the great write up. I am looking for a job queue system to integrate in one of my php applications and your post shed some light in the possibilities offered by beanstalkd.
    It seems a pretty easy way to get the work done and appear to be one of the easiest possibilities in the php world.
    Looking forward to implement it in my stack.

  10. Hi, I'm the author of QuTee (https://github.com/anorgan/QuTee), a PHP library for queuing tasks and processing them. I've implemented Redis and MySql backed queues, and am in a process of implementing beanstalk, but I didn't know about priorities in beanstalk, so this post realy helped! Also, Adrian made a good point in tying jobs to deployment version.

  11. Great Article. But I was wondering how to make this worker run constantly on my server? How can I make a daemon or a constant thing running without running out of my memory?

  12. Tirthesh,..

    Take care of this: "The workers are simply command-line PHP scripts that run for a long time, picking up jobs when they become available."

    #Ej,.. From your remote server
    user@myserver$ php some-worker.php

    Regards

  13. Be careful with buried jobs (and never kicked) or not processed jobs (because tube is not listened to) in combination with beanstalkd persistency (-b). The persistency file cleanup stops at the point where there is still a job to process. If this is a old job, the persistency files are growing and growing until the old jobs are processed.

  14. I've been playing around with beanstalkd and some simple workers and want to implement it in one of my company's application which is basically nothing more than a newsletter application but which has multiple instances with different databases per client on a single server.

    Would it be better to run separate workers per client / database, each having their own queue/tube to work with, or a single worker that just uses one tube and is able to switch databases depending on some form of client ID or database ID in the message ?

    I first thought running a single worker through a list of tubes, one per client, but that seemed to undermine the idea of having an asynchronous queue in the first place.

    • I'd probably go for separate workers and tines. That way it's easy to add or remove individual ones, or to split them between platform if you need to scale up in the future. If they're really similar you could use a single code base and deploy it lots of times?

  15. Better than what? I'd always recommend a tool like supervisord to capture loss and marshal restarts if needed

Leave a Reply

Please use [code] and [/code] around any source code you wish to share.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>