Guru - Multiplexing

[The following mini-article is something that I wrote for the International PHP Magazine a while back, as part of the 'ask a guru' column; I'm re-publishing it here because it's useful and because people have asked me about the topic twice in the last two days]

Question:

Is there a way to do a form of threading in PHP?

Say for instance you write a PHP application to monitor a service on a number of servers, it would be nice to be able query a number of servers at the same time rather then query them one-by-one.

Can it be done?

Answer:

People often assume that you need to fork or spawn threads whenever you need to do several things at the same time - and when they realize that PHP doesn't support threading they move on to something less nice, like perl.

The good news is that in the majority of cases you don't need to fork or thread at all, and that you will often get much better performance for not forking/threading in the first place.

Say you need to check up on web servers running on a number of hosts to make sure that they are still responding to the outside world. You might write a script like this:

<?php
$hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com");
$timeout = 15;
$status = array();
foreach ($hosts as $host) {
    $errno = 0;
    $errstr = "";
    $s = fsockopen($host, 80, $errno, $errstr, $timeout);
    if ($s) {
        $status[$host] = "Connected\\n";
        fwrite($s, "HEAD / HTTP/1.0\\r\\nHost: $host\\r\\n\\r\\n");
        do {
            $data = fread($s, 8192);
            if (strlen($data) == 0) {
                break;
            }
            $status[$host] .= $data;
        } while (true);
        fclose($s);
    } else {
        $status[$host] = "Connection failed: $errno $errstr\\n";
    }
}
print_r($status);
?>

This works fine, but since fsockopen() doesn't return until it has resolved the hostname and made a successful connection (or waited up to $timeout seconds), extending this script to monitor a larger number of hosts makes it slow to complete.

There is no reason why we have to do it sequentially; we can make asynchronous connections - that is, connections where we don't have to wait for fsockopen to return an opened connection. PHP will still need to resolve the hostname (so its better to use IP addresses), but will return as soon as it has started to open the connection, so that we can move on to the next host.

There are two ways to achieve this; in PHP 5, you can use the new stream_socket_client() function as a drop-in replacement for fsockopen(). In earlier versions of PHP, you need to get your hands dirty and use the sockets extension.

Here's how to do it in PHP 5:

<?php
$hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com");
$timeout = 15;
$status = array();
$sockets = array();
/* Initiate connections to all the hosts simultaneously */
foreach ($hosts as $id => $host) {
    $s = stream_socket_client("$host:80", $errno, $errstr, $timeout, 
        STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
    if ($s) {
        $sockets[$id] = $s;
        $status[$id] = "in progress";
    } else {
        $status[$id] = "failed, $errno $errstr";
    }
}
/* Now, wait for the results to come back in */
while (count($sockets)) {
    $read = $write = $sockets;
    /* This is the magic function - explained below */
    $n = stream_select($read, $write, $e = null, $timeout);
    if ($n > 0) {
        /* readable sockets either have data for us, or are failed
         * connection attempts */
        foreach ($read as $r) {
            $id = array_search($r, $sockets);
            $data = fread($r, 8192);
            if (strlen($data) == 0) {
                if ($status[$id] == "in progress") {
                    $status[$id] = "failed to connect";
                }
                fclose($r);
                unset($sockets[$id]);
            } else {
                $status[$id] .= $data;
            }
        }
        /* writeable sockets can accept an HTTP request */
        foreach ($write as $w) {
            $id = array_search($w, $sockets);
            fwrite($w, "HEAD / HTTP/1.0\\r\\nHost: "
                . $hosts[$id] .  "\\r\\n\\r\\n");
            $status[$id] = "waiting for response";
        }
    } else {
        /* timed out waiting; assume that all hosts associated
         * with $sockets are faulty */
        foreach ($sockets as $id => $s) {
            $status[$id] = "timed out " . $status[$id];
        }
        break;
    }
}
foreach ($hosts as $id => $host) {
    echo "Host: $host\\n";
    echo "Status: " . $status[$id] . "\\n\\n";
}
?>

We are using stream_select() to wait for events on the sockets that we opened. stream_select() calls the system select(2) function and it works like this: The first three parameters are arrays of streams that you want to work with; you can wait for reading, writing and exceptional events (parameters one, two and three respectively). stream_select() will wait up to $timeout seconds for an event to occur - when it does, it will modify the arrays you passed in to contain the sockets that have met your criteria.

Now, using PHP 4.1.0 and later, if you have compiled in support for ext/sockets, you can use the same script as above, but you need to replace the regular streams/filesystem function calls with their equivalents from ext/sockets. The major difference though is in how we open the connection; instead of stream_socket_client(), you need to use this function:

<?php
// This value is correct for Linux, other systems have other values
define('EINPROGRESS', 115);
function non_blocking_connect($host, $port, &$errno, &$errstr, $timeout) {
    $ip = gethostbyname($host);
    $s = socket_create(AF_INET, SOCK_STREAM, 0);
    if (socket_set_nonblock($s)) {
        $r = @socket_connect($s, $ip, $port);
        if ($r || socket_last_error() == EINPROGRESS) {
            $errno = EINPROGRESS;
            return $s;
        }
    }
    $errno = socket_last_error($s);
    $errstr = socket_strerror($errno);
    socket_close($s);
    return false;
}
?>

Now, replace stream_select() with socket_select(), fread() with socket_read(), fwrite() with socket_write() and fclose() with socket_close() and you are ready to run the script.

The advantage of the PHP 5 approach is that you can use stream_select() to wait on (almost!) any kind of stream - you can wait for keyboard input from the terminal by including STDIN in your read array for example, and you can also wait for data from pipes created by the proc_open() function.

If you want PHP 4.3.x and want to use the native streams approach, I have prepared a patch that allows fsockopen to work asynchronously. The patch is unsupported and won't be in an official PHP release, however, I've provided a wrapper that implements the stream_socket_client() function along with the patch, so that your code will be forwards compatible with PHP 5.

Resources:

documentation for stream_select()
documentation for socket_select()
patch for PHP 4.3.2 and script to emulate stream_socket_client(). (might work with later 4.3.x versions).