<?php
namespace Cms\CoreBundle\EventSubscriber\OneRoster;
use Cms\CoreBundle\Doctrine\Hydrators\SingleColumnHydrator;
use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
use Cms\CoreBundle\Entity\OneRosterJob;
use Cms\CoreBundle\Events\OneRosterEvents;
use Cms\CoreBundle\Events\OneRosterLinkEvent;
use Platform\QueueBundle\Event\AsyncEvent;
use Platform\QueueBundle\Model\AsyncMessage;
/**
* Class OneRosterLinkSubscriber
* @package Cms\CoreBundle\EventSubscriber\OneRoster
*/
final class OneRosterLinkSubscriber extends AbstractOneRosterSubscriber
{
/**
* {@inheritDoc}
*/
public static function getSubscribedEvents(): array
{
return [
OneRosterEvents::EVENT__LINK => [
['discardableDisable', self::PRIORITY__FIRST],
['phaseStart', self::PRIORITY__FIRST],
['link', 0],
['phaseFin', self::PRIORITY__LAST],
['phaseTrigger', self::PRIORITY__LAST],
['discardableEnable', self::PRIORITY__LAST],
],
OneRosterEvents::EVENT__LINK__TYPE => [
['discardableDisable', self::PRIORITY__FIRST],
['linkType', 0],
['phaseFin', self::PRIORITY__LAST],
['phaseTrigger', self::PRIORITY__LAST],
['discardableEnable', self::PRIORITY__LAST],
],
OneRosterEvents::EVENT__LINK__OBJECT => [
['discardableDisable', self::PRIORITY__FIRST],
['linkObject', 0],
['phaseFin', self::PRIORITY__LAST],
['phaseTrigger', self::PRIORITY__LAST],
['discardableEnable', self::PRIORITY__LAST],
],
];
}
/**
* {@inheritDoc}
*/
public function phase(): int
{
return OneRosterJob::PHASES__LINK;
}
/**
* {@inheritDoc}
*/
public function next(): ?int
{
return OneRosterJob::PHASES__TWEAK;
}
/**
* @param AsyncEvent $event
*/
public function link(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// setup calls to link each type
/** @var array|AsyncMessage[] $messages */
$messages = [];
foreach (array_keys(AbstractOneRosterEntity::ONEROSTER_TYPES) as $type) {
$messages[] = new AsyncMessage(
$job,
OneRosterEvents::EVENT__LINK__TYPE,
[
'job' => $job->getId(),
'type' => $type,
],
self::MQ_PRIORITY,
);
}
// update tracking props on the sync
$this->oneroster->orchestratePhaseChange($job, count($messages));
// queue them up
try {
$this->async->queue(
null,
$messages
);
} catch (\Exception $e) {
$this->oneroster->orchestratePhaseRevert($job, count($messages));
throw $e;
}
// DEBUGGING
foreach ($messages as $message) {
$event->getOutput()->writeln(sprintf(
'LINK__TYPE of "%s" triggered for sync #%s',
$message->getPayload()['type'],
$job->getIdentifier()
));
}
}
/**
* Handles linking triggers of individual types of objects for a sync.
*
* @param AsyncEvent $event
*/
public function linkType(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// get data from the payload
[$type, $class] = $this->parseForType($event);
// get the objects
$objects = $this->em->getRepository($class)->createQueryBuilder('objects')
->select('objects.sourcedId')
->orderBy('objects.createdAt', 'ASC')
->getQuery()
->getResult(SingleColumnHydrator::HYDRATOR);
// loop over the object to create the messages to process each fully
$messages = [];
foreach ($objects as $index => $object) {
if ($index % AsyncMessage::MAX_BATCH === 0) {
$messages[] = [];
}
$messages[count($messages) - 1][] = new AsyncMessage(
$job,
OneRosterEvents::EVENT__LINK__OBJECT,
[
'job' => $job->getId(),
'type' => $type,
'id' => $object,
],
self::MQ_PRIORITY,
);
}
// do only if we have messages
foreach ($messages as $batch) {
// tracking
$this->oneroster->orchestratePhaseChange($job, count($batch));
// queue them up
try {
$this->async->queue(
null,
$batch
);
} catch (\Exception $e) {
$this->oneroster->orchestratePhaseRevert($job, count($batch));
throw $e;
}
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'LINK__OBJECT batch: %s',
count($batch)
));
}
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'LINK__OBJECT for %s "%s" object(s) triggered for sync #%s',
array_sum(array_map('count', $messages)),
$type,
$job->getIdentifier()
));
}
/**
* Handle linking of a particular object for a sync.
* This triggers a "sub" event that allows things in the system to act upon the data.
*
* @param AsyncEvent $event
*/
public function linkObject(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// get data from the payload
/** @var AbstractOneRosterEntity $entity */
[$class, $entity] = $this->parseForObject($event);
// TODO: use dirty checks to see if we need to process this entity at all? may not be needed in linking step...
// need to create an event for processing of this specific type
$subevent = new OneRosterLinkEvent(
$entity,
$job,
$event->getOutput()
);
// capture errors
try {
// dispatch the event
$this->dispatcher->dispatch(
$subevent,
OneRosterLinkEvent::EVENTS[$class],
);
// if propagation was stopped, it means there was a problem
if ($subevent->isPropagationStopped()) {
throw new \Exception(
'Propagation was stopped while handling event.',
);
}
} catch (\Exception $e) {
$this->oneroster->logIssue(
$job,
$this->phase(),
OneRosterLinkEvent::EVENTS[$class],
$entity::ONEROSTER_TYPE,
$entity,
$e
);
}
}
}