<?php
namespace Products\NotificationsBundle\Subscriber;
use Cms\CoreBundle\Util\Doctrine\EntityManager;
use Platform\QueueBundle\Event\AsyncEvent;
use Products\NotificationsBundle\Entity\AbstractNotification;
use Products\NotificationsBundle\Entity\Job;
use Products\NotificationsBundle\Entity\Notifications\Channels\ChannelsInterface;
use Products\NotificationsBundle\Service\Switchboard;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
final class BroadcastSubscriber implements EventSubscriberInterface
{
/**
* @var EntityManager
*/
protected EntityManager $em;
/**
* @var Switchboard
*/
protected Switchboard $switchboard;
/**
* @param EntityManager $em
* @param Switchboard $switchboard
*/
public function __construct(EntityManager $em, Switchboard $switchboard)
{
$this->em = $em;
$this->switchboard = $switchboard;
}
/**
* {@inheritDoc}
*/
public static function getSubscribedEvents(): array
{
return [
Switchboard::EVENTS__BROADCAST_JOB => ['onBroadcastJob', 0],
Switchboard::EVENTS__BROADCAST_CHANNEL => ['onBroadcastChannel', 0],
Switchboard::EVENTS__BROADCAST_ITEM => ['onBroadcastItem', 0],
];
}
/**
* @param AsyncEvent $event
*/
public function onBroadcastJob(AsyncEvent $event)
{
$job = $this->em->getRepository(Job::class)->find(
$event->getBody()->get('job')
);
if ( ! $job instanceof Job) {
throw new \Exception();
}
$result = $this->switchboard->broadcastJob($job);
$event->getOutput()->writeln(sprintf(
'Broadcasting to %s channels.',
$result,
));
}
/**
* @param AsyncEvent $event
*/
public function onBroadcastChannel(AsyncEvent $event)
{
$job = $this->em->getRepository(Job::class)->find(
$event->getBody()->get('job')
);
if ( ! $job instanceof Job) {
throw new \Exception();
}
$result = $this->switchboard->broadcastChannel(
$job,
$channel = $event->getBody()->get('channel')
);
$event->getOutput()->writeln(sprintf(
'Broadcasting %s items to channel %s.',
$result,
array_search($channel, ChannelsInterface::CHANNELS),
));
}
/**
* @param AsyncEvent $event
*/
public function onBroadcastItem(AsyncEvent $event)
{
$job = $this->em->getRepository(Job::class)->find(
$event->getBody()->get('job')
);
if ( ! $job instanceof Job) {
throw new \Exception();
}
$this->switchboard->broadcastItem(
$job,
$event->getBody()->get('channel'),
$event->getBody()->get('item')// this will still be a scalar value, the handler for the channel should know how to load it
);
}
}