I must admit that nowadays I’m using RabbitMQ more than Gearman but I’m still a big fan of gearman. PHP has a great api to connect to gearman work server but sometimes I miss another, how to say, “clean” way. Because of that I’ve creates a gearman wrapper. Let’s start.
I want to cover different areas: Workers, clients, background clients, and tasks.
Worker example:
use G\Gearman\Builder;
$worker = Builder::createWorker();
$worker->on("slow.process", function ($response, \GearmanJob $job) {
echo "Response: {$response} unique: {$job->unique()}\n";
// we emulate a slow process with a sleep
sleep(2);
return $job->unique();
});
$worker->on("fast.process", function ($response, \GearmanJob $job) {
echo "Response: {$response} unique: {$job->unique()}\n";
return $job->unique();
});
$worker->on("exception.process", function () {
// we emulate a failing process
throw new \Exception("Something wrong happens");
});
$worker->run();
And a client:
use G\Gearman\Builder;
$client = Builder::createClient();
$client->onSuccess(function ($response) {
echo $response;
});
$client->doNormal('fast.process', "Hello");
One background client
use G\Gearman\Builder;
$client = Builder::createClient();
$client->doBackground('slow.process', "Hello1");
$client->doBackground('slow.process', "Hello2");
$client->doBackground('slow.process', "Hello3");
And finally, tasks
use G\Gearman\Builder;
$tasks = Builder::createTasks();
$tasks->onSuccess(function (\GearmanTask $task, $context) {
$out = is_callable($context) ? $context($task) : $task->data();
echo "onSuccess response: " . $out . " id: {$task->unique()}\n";
});
$tasks->onException(function (\GearmanTask $task) {
echo "onException response {$task->data()}\n";
});
$responseParser = function (\GearmanTask $task) {
return "Hello " . $task->data();
};
$tasks->addTask('fast.process', "fast1", $responseParser, 'g1');
$tasks->addTaskHigh('slow.process', "slow1", null, 'xxxx');
$tasks->addTask('fast.process', "fast2");
$tasks->addTask('exception.process', 'hi');
$tasks->runTasks();
The library is just a wrapper to the official api. I’ve create a builder to simplify the creation of the instances:
namespace G\Gearman;
class Builder
{
static function createWorker($servers=null)
{
$worker = new \GearmanWorker();
$worker->addServers($servers);
return new Worker($worker);
}
static function createClient($servers=null)
{
$client = new \GearmanClient();
$client->addServers($servers);
return new Client($client);
}
static function createTasks($servers=null)
{
$client = new \GearmanClient();
$client->addServers($servers);
return new Tasks($client);
}
}
that’s the worker wrapper
namespace G\Gearman;
class Worker
{
private $worker;
public function __construct(\GearmanWorker $worker)
{
$this->worker = $worker;
}
public function on($name, callable $callback, $context = null, $timeout = 0)
{
$this->worker->addFunction($name, function (\GearmanJob $job) use ($callback) {
return call_user_func($callback, json_decode($job->workload()), $job);
}, $context, $timeout);
}
public function run()
{
try {
$this->loop();
} catch (\Exception $e) {
echo $e->getMessage() . "\n";
$this->run();
}
}
private function loop()
{
while ($this->worker->work()) {
}
}
}
Now the client one
namespace G\Gearman;
class Client
{
private $onSuccessCallback;
private $client;
public function __construct(\GearmanClient $client)
{
$this->client = $client;
}
public function doHigh($name, $workload=null, $unique = null)
{
return $this->doAction(__FUNCTION__, $name, $workload, $unique);
}
public function doNormal($name, $workload=null, $unique = null)
{
return $this->doAction(__FUNCTION__, $name, $workload, $unique);
}
public function doLow($name, $workload=null, $unique = null)
{
return $this->doAction(__FUNCTION__, $name, $workload, $unique);
}
public function doBackground($name, $workload=null, $unique = null)
{
return $this->doAction(__FUNCTION__, $name, $workload, $unique);
}
public function doHighBackground($name, $workload=null, $unique = null)
{
return $this->doAction(__FUNCTION__, $name, $workload, $unique);
}
public function doLowBackground($name, $workload=null, $unique = null)
{
return $this->doAction(__FUNCTION__, $name, $workload, $unique);
}
private function doAction($action, $name, $workload=null, $unique)
{
$workload = (string)$workload;
$handle = $this->client->$action($name, json_encode($workload), $unique);
$returnCode = $this->client->returnCode();
if ($returnCode != \GEARMAN_SUCCESS) {
throw new \Exception($this->client->error(), $returnCode);
} else {
if ($this->onSuccessCallback) {
return call_user_func($this->onSuccessCallback, $handle);
}
}
return null;
}
public function onSuccess(callable $callback)
{
$this->onSuccessCallback = $callback;
}
}
and finally the tasks
namespace G\Gearman;
class Tasks
{
private $client;
private $tasks;
public function __construct(\GearmanClient $client)
{
$this->tasks = [];
$this->client = $client;
}
public function addTask($name, $workload=null, $context = null, $unique = null)
{
$this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
}
public function addTaskHigh($name, $workload=null, $context = null, $unique = null)
{
$this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
}
public function addTaskLow($name, $workload=null, $context = null, $unique = null)
{
$this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
}
public function addTaskBackground($name, $workload=null, $context = null, $unique = null)
{
$this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
}
public function addTaskHighBackground($name, $workload=null, $context = null, $unique = null)
{
$this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
}
public function addTaskLowBackground($name, $workload=null, $context = null, $unique = null)
{
$this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
}
public function runTasks()
{
foreach ($this->tasks as list($actionName, $name, $workload, $context, $unique)) {
$this->client->$actionName($name, json_encode($workload), $context, $unique);
}
$this->client->runTasks();
}
public function onSuccess(callable $callback)
{
$this->client->setCompleteCallback($callback);
}
public function onException(callable $callback)
{
$this->client->setExceptionCallback($callback);
}
public function onFail(callable $callback)
{
$this->client->setFailCallback($callback);
}
}
Library is available in packagist and source code in my github account.