<?php
namespace Cms\CoreBundle\EventSubscriber\OneRoster;
use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
use Cms\CoreBundle\Entity\OneRoster\OneRosterUser;
use Cms\CoreBundle\Entity\OneRosterJob;
use Cms\CoreBundle\Entity\OneRosterSync;
use Cms\CoreBundle\Events\OneRosterEvents;
use Cms\CoreBundle\Model\OneRosterStateBitwise;
use DateTimeInterface;
use Platform\QueueBundle\Event\AsyncEvent;
use Platform\QueueBundle\Model\AsyncMessage;
/**
* Class OneRosterStashSubscriber
* @package Cms\CoreBundle\EventSubscriber\OneRoster
*/
final class OneRosterStashSubscriber extends AbstractOneRosterSubscriber
{
/**
* {@inheritDoc}
*/
public static function getSubscribedEvents(): array
{
return [
OneRosterEvents::EVENT__STASH => [
['discardableDisable', self::PRIORITY__FIRST],
['phaseStart', self::PRIORITY__FIRST],
['stash', 0],
['phaseFin', self::PRIORITY__LAST],
['phaseTrigger', self::PRIORITY__LAST],
['discardableEnable', self::PRIORITY__LAST],
],
OneRosterEvents::EVENT__STASH__TYPE => [
['discardableDisable', self::PRIORITY__FIRST],
['stashType', 0],
['phaseFin', self::PRIORITY__LAST],
['phaseTrigger', self::PRIORITY__LAST],
['discardableEnable', self::PRIORITY__LAST],
],
OneRosterEvents::EVENT__STASH__OBJECT => [
['discardableDisable', self::PRIORITY__FIRST],
['stashObject', 0],
['phaseFin', self::PRIORITY__LAST],
['phaseTrigger', self::PRIORITY__LAST],
['discardableEnable', self::PRIORITY__LAST],
],
];
}
/**
* {@inheritDoc}
*/
public function phase(): int
{
return OneRosterJob::PHASES__STASH;
}
/**
* {@inheritDoc}
*/
public function next(): ?int
{
return OneRosterJob::PHASES__FIX;
}
/**
* Handles the basic event of stashing a particular sync that has been triggered.
*
* @param AsyncEvent $event
*/
public function stash(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// determine the types we need to pull
$types = array_fill_keys(array_keys(AbstractOneRosterEntity::ONEROSTER_TYPES), false);
foreach ($job->getSync()->getStrategiesAsArray() as $strategy) {
$stypes = array_keys(OneRosterSync::ONEROSTER_TYPES_MAPPINGS[$strategy]);
foreach ($stypes as $stype) {
$types[$stype] = true;
}
}
$types = array_keys(array_filter($types));
// setup calls to process each type
/** @var array|AsyncMessage[] $messages */
$messages = [];
foreach ($types as $type) {
$messages[] = new AsyncMessage(
$job,
OneRosterEvents::EVENT__STASH__TYPE,
[
'job' => $job->getId(),
'type' => $type,
'page' => 0,
],
self::MQ_PRIORITY,
);
}
// tracking
$this->oneroster->orchestratePhaseChange($job, count($messages));
// queue them up
try {
$this->async->queue(
null,
$messages
);
} catch (\Exception $e) {
$this->oneroster->orchestratePhaseRevert($job, count($messages));
throw $e;
}
// DEBUGGING
foreach ($messages as $message) {
$event->getOutput()->writeln(sprintf(
'STASH__TYPE of "%s" triggered for sync #%s',
$message->getPayload()['type'],
$job->getIdentifier()
));
}
}
/**
* Handles a stashing event for a particular type of data for a sync.
*
* @param AsyncEvent $event
*/
public function stashType(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// should also tell us what type we are pulling
// this type is a string oneroster type, not a fqcn
$type = $event->getParam('type');
$page = $event->getParam('page');
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Obtaining page %s of data',
$page
));
// will be working with the oneroster api
$api = $this->oneroster->api($job);
// make api calls to get the data of these types and to make messages out of them
$messages = [];
// get the objects
$objects = $api->getObjects($type, $page);
// check pagination here so other workers can go ahead and be chewing on stuff while we are still processing
// if we have hit the limit, we need to page
if ($api->paginate($objects)) {
// increment the page
$page++;
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Need to page, adding message for page %s of data',
$page
));
// tracking
$this->oneroster->orchestratePhaseChange($job, 1);
// queue the next page
try {
$this->async->send(
null,
new AsyncMessage(
$job,
OneRosterEvents::EVENT__STASH__TYPE,
[
'job' => $job->getId(),
'type' => $type,
'page' => $page,
],
self::MQ_PRIORITY,
)
);
} catch (\Exception $e) {
$this->oneroster->orchestratePhaseRevert($job, 1);
throw $e;
}
}
// determine what roles we are interested in
$roles = [];
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__SSO)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__ADMINISTRATOR,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__AIDE,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__PROCTOR,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__TEACHER,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STAFF,
]);
}
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__STAFF)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__ADMINISTRATOR,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__AIDE,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__PROCTOR,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__TEACHER,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STAFF,
]);
}
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__FAMILY)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__GUARDIAN,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__PARENT,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__RELATIVE,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STUDENT,
]);
}
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__STUDENTS)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STUDENT,
]);
}
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__COMMUNITY)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__COMMUNITY,
]);
}
$roles = array_unique($roles);
// loop over the object to create the messages to process each fully
foreach ($objects as $object) {
// attempt to pick off critical errors with incoming data
try {
// check for no role on the data
if ($type === OneRosterUser::ONEROSTER_TYPE && ! array_key_exists('role', $object)) {
throw new \Exception('Role not given for user.');
}
} catch (\Exception $e) {
$this->oneroster->logIssue(
$job,
$this->phase(),
OneRosterEvents::EVENT__STASH__TYPE,
OneRosterUser::ONEROSTER_TYPE,
$object['sourcedId'],
$e
);
continue;
}
// handle special use cases that we want to skip
// TODO: this needs done better; cases are confusing/complex and makes the file messy...
$skip = false;
switch (true) {
// TODO: do we need student data when processing the other types?
// if we are dealing with users, skip if:
// no role in data set
// role in data set but is empty
// role is there but not in the array of roles that we care about
case $type == OneRosterUser::ONEROSTER_TYPE && ( ! array_key_exists('role', $object) || empty($object['role']) || ! in_array($object['role'], $roles)):
$skip = true;
break;
// handle powerschool emergency contacts hack...
// if the sync has the flag set, check and see if the ps id is given
// if it is, if the ps id has a space in it, skip it (likely it is an emergency contact record)
case $job->getSync()->hasFlag(OneRosterSync::FLAGS__POWERSCHOOL__SKIP_EMERGENCY_CONTACTS) && $type == OneRosterUser::ONEROSTER_TYPE && (array_key_exists('userIds', $object)):
foreach ($object['userIds'] as $userId) {
if (array_key_exists('type', $userId) && $userId['type'] === 'ps_id') {
// TODO: multibyte?
if (array_key_exists('identifier', $userId) && preg_match('/^[^ ]+ of /', $userId['identifier']) === 1) {
$skip = true;
break;
}
}
}
break;
}
// if the data type is useful to us, make mq message for it
if ( ! $skip) {
$messages[] = new AsyncMessage(
$job,
OneRosterEvents::EVENT__STASH__OBJECT,
[
'job' => $job->getId(),
'type' => $type,
'data' => $object,
],
self::MQ_PRIORITY,
);
}
}
// do only if we have anything to add to the system
if ($messages) {
// tracking
$this->oneroster->orchestratePhaseChange($job, count($messages));
// queue them up
try {
$this->async->queue(
null,
$messages
);
} catch (\Exception $e) {
$this->oneroster->orchestratePhaseRevert($job, count($messages));
throw $e;
}
}
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'STASH__OBJECT for %s "%s" object(s) triggered for sync #%s',
count($messages),
$type,
$job->getIdentifier()
));
}
/**
* Handles an event for stashing a particular object of a certain type for a sync.
*
* @param AsyncEvent $event
*/
public function stashObject(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// should also tell us what type we are pulling
// oneroster type, not fqcn
$type = $event->getParam('type');
// determine the class based on the type
$class = AbstractOneRosterEntity::ONEROSTER_TYPES[$type];
// also grab the raw data from the api
$data = $event->getParam('data');
// see if we already have an object of this type of this id
/** @var AbstractOneRosterEntity $entity */
$entity = $this->em->getRepository($class)->findOneBy([
'sourcedId' => $data['sourcedId'],
]);
if (empty($entity)) {
// no entity, so make a new one
$entity = new $class();
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Entity "%s" created for #%s',
$class,
$data['sourcedId']
));
// since we have made a new entity, it means that we are inherently dirty
$entity->getState()->addFlag(OneRosterStateBitwise::DIRTY_VIA_NEW);
// DEBUGGING
$event->getOutput()->writeln(
'DIRTY_VIA_NEW flagged'
);
// no previous data, need to clear it all
$status = null;
$dateLastModified = null;
$checksum = null;
} else {
// make sure the object is not marked as clean
$entity->getState()->removeFlag(OneRosterStateBitwise::CLEAN);
// save off the previous data that we need to check against for dirty checks
$status = $entity->getStatus();
$dateLastModified = $entity->getDateLastModified();
$checksum = $entity->getChecksum();
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Entity "%s" found for #%s',
$class,
$entity->getSourcedId()
));
}
// go ahead and attach the job and sync to this entity
$entity
->setSync($this->em->getReference(OneRosterSync::class, $job->getSync()->getId()))
->setJob($this->em->getReference(OneRosterJob::class, $job->getId()));
// merge in the data from the api
$entity->merge($data);
// compare status dirty check
if ($status != $entity->getStatus()) {
// status changed, object is dirty
$entity->getState()->addFlag(OneRosterStateBitwise::DIRTY_VIA_STATUS);
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'DIRTY_VIA_STATUS flagged (%s != %s)',
(empty($status)) ? 'null' : $status,
$entity->getStatus()
));
}
// compare date last modified dirty check
if ($dateLastModified != $entity->getDateLastModified()) {
// date last modified changed, object is dirty
$entity->getState()->addFlag(OneRosterStateBitwise::DIRTY_VIA_DATE_MODIFIED);
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'DIRTY_VIA_DATE_MODIFIED flagged (%s != %s)',
(empty($dateLastModified)) ? 'null' : $dateLastModified->format(DateTimeInterface::RFC3339_EXTENDED),
$entity->getDateLastModified()->format(DateTimeInterface::RFC3339_EXTENDED)
));
}
// compare crc dirty check
// calc new crc first
$entity->setChecksum(crc32(json_encode($entity->data())));
if ($checksum != $entity->getChecksum()) {
// checksum changed, object is dirty
$entity->getState()->addFlag(OneRosterStateBitwise::DIRTY_VIA_CHECKSUM);
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'DIRTY_VIA_CHECKSUM flagged (%s != %s)',
(empty($checksum)) ? 'null' : $checksum,
$entity->getChecksum()
));
}
// TODO: check object conditionals for dirty data
// see if we are dirty
if ($entity->getState()->isDirty()) {
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Record is DIRTY (%s)',
implode(', ', $entity->getState()->getFlagNames(null, true))
));
} else {
// all checks are pointing to the object being clean, mark it as such
$entity->getState()->addFlag(OneRosterStateBitwise::CLEAN);
// DEBUGGING
$event->getOutput()->writeln(
'Record is CLEAN'
);
}
// save it
$this->em->save($entity);
}
}