Skip to content

Instantly share code, notes, and snippets.

@jverdeyen
Last active February 13, 2018 14:11
Show Gist options
  • Save jverdeyen/6641643dd7420992ac4d8693c0ee330c to your computer and use it in GitHub Desktop.
Save jverdeyen/6641643dd7420992ac4d8693c0ee330c to your computer and use it in GitHub Desktop.
Enqueue Symfony Console commands
parameters:
persistent_cache_dir: /tmp
app_name: ProjectX
enqueue:
transport:
default: 'fs'
fs:
dsn: "file://%persistent_cache_dir%/enqueue"
path: "./"
pre_fetch_count: 1
chmod: 600
polling_interval: 100
# sqs:
# key: "%aws_s3_key%"
# secret: "%aws_s3_secret%"
# token: ~
# region: eu-west-1
# retries: 3
# version: '2012-11-05'
# lazy: true
client:
app_name: '%app_name%_%kernel.environment%'
default_processor_queue: 'default'
traceable_producer: true
async_events: false
job: false
<?php
namespace AppBundle\Model\Queue;
class QueuedCommand
{
/**
* @var string
*/
private $name;
/**
* @var array
*/
private $parameters;
public function __construct(string $name, array $parameters)
{
$this->name = $name;
$this->parameters = $parameters;
}
/**
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* @return array
*/
public function getParameters(): array
{
return $this->parameters;
}
}
<?php
namespace AppBundle\Service\Queue;
use AppBundle\Model\Queue\QueuedCommand;
use Enqueue\Client\ProducerInterface;
class QueuedCommandHandle
{
/**
* @var ProducerInterface
*/
protected $producer;
/**
* @var string
*/
protected $env;
/**
* @param ProducerInterface $producer
* @param string $env
*/
public function __construct(ProducerInterface $producer, $env)
{
$this->producer = $producer;
$this->env = $env;
}
public function handle(QueuedCommand $command): void
{
$argumentString = $this->createArgumentString($command->getParameters());
$fullCommand = sprintf('%s %s', $command->getName(), $argumentString);
$this->producer->sendCommand('run_command', $fullCommand);
}
public function handleString(string $fullCommand): void
{
$this->producer->sendCommand('run_command', $fullCommand);
}
private function createArgumentString(array $arguments)
{
$optionList = [];
foreach ($arguments as $key => $value) {
if (!is_int($key)) {
$optionList[] = sprintf('--%s=%s', $key, $value);
continue;
}
$optionList[] = sprintf('%s', $value);
}
$optionList[] = sprintf('--env=%s', $this->env);
return implode(' ', $optionList);
}
}
<?php
namespace AppBundle\Service\Queue\Processor;
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Consumption\Result;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\Process;
class RunCommandProcessor implements PsrProcessor, CommandSubscriberInterface
{
/**
* @var string
*/
private $projectDir;
public function __construct(string $projectDir)
{
$this->projectDir = $projectDir;
}
public function process(PsrMessage $message, PsrContext $context)
{
$commandline = $message->getBody();
$process = new Process('./bin/console '.$commandline, $this->projectDir);
try {
$process->mustRun();
return Result::ACK;
} catch (ProcessFailedException $e) {
return Result::reject(sprintf('The process failed with exception: "%s" in %s at %s', $e->getMessage(), $e->getFile(), $e->getLine()));
}
}
public static function getSubscribedCommand()
{
return 'run_command';
}
}
services:
AppBundle\Service\Queue\QueuedCommandHandler:
arguments:
$env: "%kernel.environment%"
AppBundle\Service\Queue\Processor\RunCommandProcessor:
arguments:
- "%kernel.project_dir%"
tags:
- { name: 'enqueue.client.processor', topicName: 'run_command' }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment