<?php
namespace Cms\CoreBundle\EventSubscriber\OneRoster;
use App\Doctrine\ORM\Filters\DiscardableFilter;
use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
use Cms\CoreBundle\Entity\OneRosterJob;
use Cms\CoreBundle\Events\OneRosterEvents;
use Cms\CoreBundle\Service\OneRosterService;
use Cms\CoreBundle\Util\Doctrine\EntityManager;
use Platform\QueueBundle\Event\AsyncEvent;
use Platform\QueueBundle\Model\AsyncMessage;
use Platform\QueueBundle\Service\AsyncQueueService;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
/**
* Class AbstractOneRosterSubscriber
* @package Cms\CoreBundle\EventSubscriber\OneRoster
*/
abstract class AbstractOneRosterSubscriber implements EventSubscriberInterface
{
protected const MQ_PRIORITY = AsyncMessage::PRIORITY__BELOW_NORMAL;
protected const PRIORITY__FIRST = PHP_INT_MAX;
protected const PRIORITY__LAST = PHP_INT_MIN;
// DI
protected EntityManager $em;
protected AsyncQueueService $async;
protected OneRosterService $oneroster;
protected EventDispatcherInterface $dispatcher;
/**
* @param EntityManager $em
* @param AsyncQueueService $async
* @param OneRosterService $oneroster
* @param EventDispatcherInterface $dispatcher
*/
public function __construct(
EntityManager $em,
AsyncQueueService $async,
OneRosterService $oneroster,
EventDispatcherInterface $dispatcher
)
{
$this->em = $em;
$this->async = $async;
$this->oneroster = $oneroster;
$this->dispatcher = $dispatcher;
}
/**
* @param AsyncEvent $event
* @return OneRosterJob
*/
protected function loadJob(AsyncEvent $event): OneRosterJob
{
// data should be an array with an id of a sync
// see if it is in cache first and load it from there if we can
if ($this->em->getCache() && $this->em->getCache()->containsEntity(OneRosterJob::class, $event->getParam('job'))) {
$job = $this->em->getRepository(OneRosterJob::class)->find($event->getParam('job'));
} else {
$job = $this->em->getRepository(OneRosterJob::class)->createQueryBuilder('job')
->leftJoin('job.sync', 'sync')
->andWhere('job.id = :job')
->setParameter('job', $event->getParam('job'))
->getQuery()
->getSingleResult();
}
// make sure we have a job
if ( ! $job instanceof OneRosterJob) {
throw new \Exception(sprintf(
'Could not load job #%s.',
$event->getParam('job')
));
}
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Sync #%s loaded',
$job->getIdentifier()
));
return $job;
}
/**
* @return int
*/
abstract public function phase(): int;
/**
* @return int|null
*/
abstract public function next(): ?int;
/**
* @param OneRosterJob $job
* @param array|int[] $checks
* @return bool
*/
protected function checkTypes(OneRosterJob $job, array $checks): bool
{
if (empty($checks)) {
throw new \Exception();
}
foreach ($checks as $check) {
if ($job->getSync()->hasStrategy($check)) {
return true;
}
}
return false;
}
/**
* @return void
*/
public function discardableDisable(): void
{
if ($this->em->getFilters()->isEnabled(DiscardableFilter::FILTER)) {
$this->em->getFilters()->disable(DiscardableFilter::FILTER);
}
}
/**
* @return void
*/
public function discardableEnable(): void
{
if ( ! $this->em->getFilters()->isEnabled(DiscardableFilter::FILTER)) {
$this->em->getFilters()->enable(DiscardableFilter::FILTER);
}
}
/**
* @param AsyncEvent $event
*/
public function phaseStart(AsyncEvent $event): void
{
// get the job
$job = $this->loadJob($event);
// tracking
$this->oneroster->orchestratePhaseInit($job, $this->phase());
// refresh the job since we did bulk update prior
$this->em->refresh($job);
}
/**
* @param AsyncEvent $event
*/
public function phaseFin(AsyncEvent $event): void
{
// get the job
$job = $this->loadJob($event);
// tracking
$this->oneroster->orchestratePhaseFin($job, $this->phase());
// refresh the job since we did bulk update prior
$this->em->refresh($job);
}
/**
* @param AsyncEvent $event
*/
public function phaseTrigger(AsyncEvent $event): void
{
// get the job
$job = $this->loadJob($event);
// see if we have a next phase and if it has been triggered yet
if ($this->oneroster->orchestratePhaseConditionalInit($job, $this->phase(), $this->next())) {
// add to mq
$this->async->send(
null,
new AsyncMessage(
$job,
OneRosterEvents::PHASES[$this->next()],
[
'job' => $job->getId(),
],
self::MQ_PRIORITY,
)
);
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'%s completed, %s triggered for sync #%s',
strtoupper(array_search($this->phase(), OneRosterJob::PHASES)),
strtoupper(array_search($this->next(), OneRosterJob::PHASES)),
$job->getIdentifier()
));
}
}
/**
* @param AsyncEvent $event
* @return array
*/
protected function parseForType(AsyncEvent $event): array
{
// should also tell us what type we are processing
// oneroster data type, not fqcn
$type = $event->getParam('type');
// determine the class based on the type
$class = AbstractOneRosterEntity::ONEROSTER_TYPES[$type];
return [
$type,
$class,
];
}
/**
* @param AsyncEvent $event
* @return array
*/
protected function parseForObject(AsyncEvent $event): array
{
// determine the class based on the type
$class = AbstractOneRosterEntity::ONEROSTER_TYPES[$event->getParam('type')];
// use the given id to load the data
$entity = $this->em->getRepository($class)->findOneBy([
'sourcedId' => $event->getParam('id'),
]);
if ( ! $entity instanceof AbstractOneRosterEntity || ! $entity instanceof $class) {
throw new \Exception();
}
return [
$class,
$entity,
];
}
}