Skip to content

Instantly share code, notes, and snippets.

@AydinHassan
Created June 10, 2025 14:44
Show Gist options
  • Save AydinHassan/f1edcda65c344879d411b758fdf0951a to your computer and use it in GitHub Desktop.
Save AydinHassan/f1edcda65c344879d411b758fdf0951a to your computer and use it in GitHub Desktop.
Symfony Messenger w/o fw
<?php
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
require_once __DIR__ . '/vendor/autoload.php';
class Message {
}
class MessagesFactory {
private static ?TransportInterface $transport = null;
public static function createMessageBus(): MessageBus {
$handlers = new HandlersLocator([
Message::class => [new class {
public function __invoke(Message $message) {
var_dump('executing handler');
throw new \RuntimeException('error');
}
}],
]);
return new MessageBus([
new SendMessageMiddleware(self::getSendersLocator()),
new HandleMessageMiddleware($handlers),
]);
}
public static function getSendersLocator(): SendersLocator {
$transportLocator = new \Symfony\Component\DependencyInjection\Container();
$transportLocator->set('async', self::getTransport());
return new SendersLocator([
Message::class => ['async'],
], $transportLocator);
}
public static function createWorker(): Worker {
$eventDispatcher = new EventDispatcher();
$eventDispatcher->addListener(WorkerMessageRetriedEvent::class, function (WorkerMessageRetriedEvent $event) {
var_dump('message retried num ' . $event->getEnvelope()->last(RedeliveryStamp::class)->getRetryCount());
});
$retryStrategyLocator = new \Symfony\Component\DependencyInjection\Container();
$retryStrategyLocator->set(
'async',
new MultiplierRetryStrategy()
);
$senderLocator = new \Symfony\Component\DependencyInjection\Container();
$senderLocator->set('async', self::getTransport());
$retryListener = new SendFailedMessageForRetryListener(
$senderLocator,
$retryStrategyLocator,
null,
$eventDispatcher
);
$eventDispatcher->addSubscriber($retryListener);
return new Worker(
[
'async' => self::getTransport()
],
MessagesFactory::createMessageBus(),
$eventDispatcher,
);
}
private static function getTransport() {
if (self::$transport !== null) {
return self::$transport;
}
return self::$transport = new InMemoryTransport();
}
}
$bus = MessagesFactory::createMessageBus();
$bus->dispatch(new Message());
$worker = MessagesFactory::createWorker();
$worker->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment