<?php
namespace Products\NotificationsBundle\Subscriber\OneRoster;
use App\Service\Data\PhoneNumberService;
use App\Service\Vendors\Twilio\TwilioFactory;
use Cms\CoreBundle\Doctrine\Hydrators\SingleColumnHydrator;
use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
use Cms\CoreBundle\Entity\OneRoster\OneRosterUser;
use Cms\CoreBundle\Entity\OneRosterJob;
use Cms\CoreBundle\Entity\OneRosterSync;
use Cms\CoreBundle\Events\OneRosterAdhocEvent;
use Cms\CoreBundle\Events\OneRosterEvents;
use Cms\CoreBundle\Service\OneRosterService;
use Cms\CoreBundle\Util\DateTimeUtils;
use Cms\CoreBundle\Util\Doctrine\EntityManager;
use PDO;
use Platform\QueueBundle\Event\AsyncEvent;
use Platform\QueueBundle\Model\AsyncMessage;
use Platform\QueueBundle\Service\AsyncQueueService;
use Products\NotificationsBundle\Entity\AbstractRecipient;
use Products\NotificationsBundle\Entity\Profile;
use Products\NotificationsBundle\Entity\ProfileContact;
use Products\NotificationsBundle\Entity\ProfileRelationship;
use Products\NotificationsBundle\Entity\Recipients\EmailRecipient;
use Products\NotificationsBundle\Entity\Recipients\PhoneRecipient;
use Products\NotificationsBundle\Entity\Student;
use Products\NotificationsBundle\Util\Reachability;
use Symfony\Contracts\Translation\TranslatorInterface;
/**
* Class OneRosterPrepareSubscriber
* @package Products\NotificationsBundle\Subscriber\OneRoster
*/
final class OneRosterPrepareSubscriber extends AbstractNotificationsOneRosterSubscriber
{
const METADATA__PHONES = [
'gg4l.home_phone' => AbstractRecipient::KINDS__VOICE,
'gg4l.work_phone' => AbstractRecipient::KINDS__VOICE,
];
const METADATA__EXTRA_PHONES = '_phones_extra';
const METADATA__EXTRA_EMAILS = '_emails_extra';
const ADHOC_EVENTS__CONTACT_PREPARATION = OneRosterEvents::EVENT__PREPARE . '.adhoc.notifications.contact_preparation';
// DI
protected AsyncQueueService $async;
protected PhoneNumberService $phones;
protected TwilioFactory $twilio;
/**
* {@inheritdoc}
*/
public static function getSubscribedEvents(): array
{
return [
OneRosterEvents::EVENT__PREPARE => [
['discardProfiles', 0],
['discardStudents', 0],
// TODO: just remove this stuff?
//['disassociateContacts', 0],
// TODO: do we need to have these things using the discardable stuff?
['disassociateRelations', 0],
['contactsPrep', 0],
],
self::ADHOC_EVENTS__CONTACT_PREPARATION => [
['contactInjection', 0],
['contactLookup', 0],
],
];
}
/**
* {@inheritDoc}
* @param AsyncQueueService $async
* @param PhoneNumberService $phones
* @param TwilioFactory $twilio
*/
public function __construct(
EntityManager $em,
OneRosterService $oneroster,
TranslatorInterface $translator,
AsyncQueueService $async,
PhoneNumberService $phones,
TwilioFactory $twilio
)
{
parent::__construct($em, $oneroster, $translator);
$this->async = $async;
$this->phones = $phones;
$this->twilio = $twilio;
}
/**
* @param AsyncEvent $event
* @return void
*/
public function discardProfiles(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// run bulk query
// this should discard every profile that has an invalid/missing oneroster id
$discards = $this->em->createQueryBuilder()
->update(Profile::class, 'profiles')
->set('profiles.discarded', ':state')
->setParameter('state', true)
->set('profiles.discardedAt', ':timestamp')
->setParameter('timestamp', DateTimeUtils::now())
->andWhere('profiles.tenant = :tenant')// filter by tenant
->setParameter('tenant', $job->getTenant()->getId())
->andWhere('profiles.discarded != :state')// for performance, only update ones not already discarded
->andWhere($this->em->getExpressionBuilder()->notIn(
'profiles.onerosterId',
$this->em->createQueryBuilder()
->select('objects.sourcedId')
->from(AbstractOneRosterEntity::class, 'objects')
->andWhere('objects.tenant = :tenant')// we are making a string subquery, so the tenant var in the other qb will be used here eventually
->andWhere('objects.status = :status')// filter out things that are marked as to be deleted
->getDQL()
))// only match objects that are missing from stashed objects
->setParameter('status', AbstractOneRosterEntity::ENUMS__STATUS_TYPE__ACTIVE)// have to set the parameter here for the subquery
->getQuery()
->execute();
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Discarded %s profiles for sync #%s',
$discards,
$job->getIdentifier()
));
}
/**
* @param AsyncEvent $event
* @return void
*/
public function discardStudents(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// run bulk query
// this should discard every profile that has an invalid/missing oneroster id
$discards = $this->em->createQueryBuilder()
->update(Student::class, 'students')
->set('students.discarded', ':state')
->setParameter('state', true)
->set('students.discardedAt', ':timestamp')
->setParameter('timestamp', DateTimeUtils::now())
->andWhere('students.tenant = :tenant')// filter by tenant
->setParameter('tenant', $job->getTenant()->getId())
->andWhere('students.discarded != :state')// for performance, only update ones not already discarded
->andWhere($this->em->getExpressionBuilder()->notIn(
'students.onerosterId',
$this->em->createQueryBuilder()
->select('objects.sourcedId')
->from(AbstractOneRosterEntity::class, 'objects')
->andWhere('objects.tenant = :tenant')// we are making a string subquery, so the tenant var in the other qb will be used here eventually
->andWhere('objects.status = :status')// filter out things that are marked as to be deleted
->getDQL()
))// only match objects that are missing from stashed objects
->setParameter('status', AbstractOneRosterEntity::ENUMS__STATUS_TYPE__ACTIVE)// have to set the parameter here for the subquery
->getQuery()
->execute();
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Discarded %s students for sync #%s',
$discards,
$job->getIdentifier()
));
}
/**
* @param AsyncEvent $event
* @return void
*/
public function disassociateContacts(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// run bulk query
// any profile contact tied to a profile that has been discarded should be removed
$removals = $this->em->createQueryBuilder()
->delete(ProfileContact::class, 'contacts')
->andWhere('contacts.tenant = :tenant')// filter by tenant
->setParameter('tenant', $job->getTenant()->getId())
->andWhere($this->em->getExpressionBuilder()->in(
'contacts.profile',
$this->em->createQueryBuilder()
->select('profiles.id')
->from(Profile::class, 'profiles')
->andWhere('profiles.tenant = :tenant')// we are making a string subquery, so the tenant var in the other qb will be used here eventually
->andWhere('profiles.discarded = :discarded')
->getDQL()
))// only match things that are tied to discarded profiles
->setParameter('discarded', true)// setting parameter here because doing a subquery
->getQuery()
->execute();
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Disassociated %s profile contacts for sync #%s',
$removals,
$job->getIdentifier()
));
}
/**
* @param AsyncEvent $event
* @return void
*/
public function disassociateRelations(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// run bulk query
// any profile contact tied to a profile that has been discarded should be removed
$removals = $this->em->createQueryBuilder()
->delete(ProfileRelationship::class, 'relationships')
->andWhere('relationships.tenant = :tenant')// filter by tenant
->setParameter('tenant', $job->getTenant()->getId())
->andWhere($this->em->getExpressionBuilder()->orX(
$this->em->getExpressionBuilder()->in(
'relationships.profile',
$this->em->createQueryBuilder()
->select('profiles.id')
->from(Profile::class, 'profiles')
->andWhere('profiles.tenant = :tenant')// we are making a string subquery, so the tenant var in the other qb will be used here eventually
->andWhere('profiles.discarded = :discarded')
->getDQL()
),
$this->em->getExpressionBuilder()->in(
'relationships.student',
$this->em->createQueryBuilder()
->select('students.id')
->from(Student::class, 'students')
->andWhere('students.tenant = :tenant')// we are making a string subquery, so the tenant var in the other qb will be used here eventually
->andWhere('students.discarded = :discarded')
->getDQL()
)
))// handle either the student being discarded or the profile being discarded
->setParameter('discarded', true)// setting parameter here because doing a subquery
->getQuery()
->execute();
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'Disassociated %s profile relationships for sync #%s',
$removals,
$job->getIdentifier()
));
}
/**
* @param AsyncEvent $event
* @return void
*/
public function contactsPrep(AsyncEvent $event): void
{
// data should be an array with an id of a sync
$job = $this->loadJob($event);
// determine what roles we are interested in
$roles = [];
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__STAFF)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__ADMINISTRATOR,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__AIDE,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__PROCTOR,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__TEACHER,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STAFF,
]);
}
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__FAMILY)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__GUARDIAN,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__PARENT,
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__RELATIVE,
]);
}
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__STUDENTS)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STUDENT,
]);
}
if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__COMMUNITY)) {
$roles = array_merge($roles, [
AbstractOneRosterEntity::ENUMS__ROLE_TYPE__COMMUNITY,
]);
}
$roles = array_unique($roles);
// get all the unique emails from the stashed data
$contacts = array_fill_keys(
array_values(array_unique(array_filter(array_map(
function (?string $contact) {
return strtolower(trim($contact));
},
$this->em->createQueryBuilder()
->select('DISTINCT users.email')
->from(OneRosterUser::class, 'users')
->andWhere('users.sync = :sync')
->setParameter('sync', $job->getSync())
->andWhere('users.role IN (:roles)')
->setParameter('roles', $roles)
->andWhere('users.email IS NOT NULL')
->andWhere('users.email <> \'\'')
->getQuery()
->getResult(SingleColumnHydrator::HYDRATOR)
)))),
AbstractRecipient::KINDS__EMAIL
);
// get extra phones coming from our data tool
$metadata = $this->em->getClassMetadata(OneRosterUser::class);
$contacts = array_merge(
$contacts,
array_fill_keys(
array_values(array_unique(array_filter(array_map(
function (?string $contact) {
return strtolower(trim($contact));
},
$this->em->getConnection()->executeQuery(
sprintf(trim('
SELECT
DISTINCT meta.contact
FROM
%s AS users,
JSON_TABLE(
users.%s,
\'$.%s\' COLUMNS (
NESTED PATH \'$[*]\' COLUMNS(
contact VARCHAR(255) PATH \'$\'
)
)
) meta
WHERE
users.%s = ?
AND
meta.contact IS NOT NULL
AND
meta.contact <> \'\'
'),
$metadata->getTableName(),
$metadata->getColumnName('metadata'),
self::METADATA__EXTRA_EMAILS,
$metadata->getColumnName('tenant'),
),
[$job->getTenant()->getId()],
[PDO::PARAM_INT],
)->fetchFirstColumn()
)))),
AbstractRecipient::KINDS__EMAIL,
),
);
// get all the unique landline phones from the stashed data
// this is done before sms as we want to prefer sms if possible
$contacts = array_merge(
$contacts,
array_fill_keys(
array_values(array_unique(array_filter(array_map(
function (?string $contact) {
try {
return $this->phones->normalize($contact);
} catch (\Exception $e) {
// TODO: do we need to maybe log something?
return null;
}
},
$this->em->createQueryBuilder()
->select('DISTINCT users.phone')
->from(OneRosterUser::class, 'users')
->andWhere('users.sync = :sync')
->setParameter('sync', $job->getSync())
->andWhere('users.role IN (:roles)')
->setParameter('roles', $roles)
->andWhere('users.phone IS NOT NULL')
->andWhere('users.phone <> \'\'')
->getQuery()
->getResult(SingleColumnHydrator::HYDRATOR)
)))),
AbstractRecipient::KINDS__VOICE
)
);
// get all the work phone numbers
foreach (self::METADATA__PHONES as $field => $kind) {
$contacts = array_merge(
$contacts,
array_fill_keys(
array_values(array_unique(array_filter(array_map(
function (?string $contact) {
// NOTE: the json functions in mysql return a json encoded string
$decoded = json_decode($contact);
if (json_last_error() !== JSON_ERROR_NONE || ! $decoded) {
return null;
}
try {
return $this->phones->normalize($decoded);
} catch (\Exception $e) {
// TODO: do we need to maybe log something?
return null;
}
},
$this->em->createQueryBuilder()
->select(sprintf(
'DISTINCT JSON_EXTRACT(users.metadata, \'$."%s"\') AS phone',
$field
))
->from(OneRosterUser::class, 'users')
->andWhere('users.sync = :sync')
->setParameter('sync', $job->getSync())
->andWhere('users.role IN (:roles)')
->setParameter('roles', $roles)
->andHaving('phone IS NOT NULL')
->andHaving('phone <> \'\'')
->getQuery()
->getResult(SingleColumnHydrator::HYDRATOR)
)))),
$kind
)
);
}
// get extra phones coming from our data tool
$metadata = $this->em->getClassMetadata(OneRosterUser::class);
$contacts = array_merge(
$contacts,
array_fill_keys(
$this->em->getConnection()->executeQuery(
sprintf(trim('
SELECT
DISTINCT meta.contact
FROM
%s AS users,
JSON_TABLE(
users.%s,
\'$.%s\' COLUMNS (
NESTED PATH \'$[*]\' COLUMNS(
contact VARCHAR(255) PATH \'$\'
)
)
) meta
WHERE
users.%s = ?
AND
meta.contact IS NOT NULL
AND
meta.contact <> \'\'
'),
$metadata->getTableName(),
$metadata->getColumnName('metadata'),
self::METADATA__EXTRA_PHONES,
$metadata->getColumnName('tenant'),
),
[$job->getTenant()->getId()],
[PDO::PARAM_INT],
)->fetchFirstColumn(),
AbstractRecipient::KINDS__VOICE,
),
);
// get all the unique cell phones from the stashed data
// if a number was used for both sms and phone, it will prefer sms
// this works because we are merging associative arrays
// duplicates are prevented as contacts are normalized and then merged into existing array
$contacts = array_merge(
$contacts,
array_fill_keys(
array_values(array_unique(array_filter(array_map(
function (?string $contact) {
try {
return $this->phones->normalize($contact);
} catch (\Exception $e) {
// TODO: do we need to maybe log something?
return null;
}
},
$this->em->createQueryBuilder()
->select('DISTINCT users.sms')
->from(OneRosterUser::class, 'users')
->andWhere('users.sync = :sync')
->setParameter('sync', $job->getSync())
->andWhere('users.role IN (:roles)')
->setParameter('roles', $roles)
->andWhere('users.sms IS NOT NULL')
->andWhere('users.sms <> \'\'')
->getQuery()
->getResult(SingleColumnHydrator::HYDRATOR)
)))),
AbstractRecipient::KINDS__SMS
)
);
// loop over the object to create the messages to process each fully
$messages = [];
$index = 0;
foreach ($contacts as $contact => $type) {
if ($index++ % AsyncMessage::MAX_BATCH === 0) {
$messages[] = [];
}
$messages[count($messages) - 1][] = new AsyncMessage(
$job,
OneRosterEvents::EVENT__PREPARE__ADHOC,
[
'job' => $job->getId(),
'event' => self::ADHOC_EVENTS__CONTACT_PREPARATION,
'payload' => [
'type' => $type,
'contact' => $contact,
],
],
self::MQ_PRIORITY,
);
}
// do only if we have messages
foreach ($messages as $batch) {
// tracking
$this->oneroster->orchestratePhaseChange($job, count($batch));
// queue them up
try {
$this->async->queue(
null,
$batch
);
} catch (\Exception $e) {
$this->oneroster->orchestratePhaseRevert($job, count($batch));
throw $e;
}
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'ADHOC_EVENTS__CONTACT_PREPARATION batch: %s',
count($batch)
));
}
// DEBUGGING
$event->getOutput()->writeln(sprintf(
'ADHOC_EVENTS__CONTACT_PREPARATION for %s contact(s) triggered for sync #%s',
array_sum(array_map('count', $messages)),
$job->getIdentifier()
));
}
/**
* @param OneRosterAdhocEvent $event
* @return void
*/
public function contactInjection(OneRosterAdhocEvent $event): void
{
// obtain the payload
$payload = $event->getPayload();
// grab the contact, already normalized
$contact = $payload['contact'];
// attempt to find an existing recipient
switch ($payload['type']) {
case AbstractRecipient::KINDS__EMAIL:
$recipient = $this->em->getRepository(EmailRecipient::class)->findOneBy([
'contact' => $contact,
]);
break;
case AbstractRecipient::KINDS__SMS:
case AbstractRecipient::KINDS__VOICE:
$recipient = $this->em->getRepository(PhoneRecipient::class)->findOneBy([
'contact' => $contact,
]);
break;
default:
throw new \Exception(sprintf(
'Cannot find existing contact of type "%s".',
$payload['type']
));
}
// if no recipient, need to make one
if ( ! $recipient) {
// make a fresh object and set specific things
switch ($payload['type']) {
case AbstractRecipient::KINDS__EMAIL:
$recipient = (new EmailRecipient());
break;
case AbstractRecipient::KINDS__SMS:
$recipient = (new PhoneRecipient())
->setMethod(PhoneRecipient::METHODS__SMS);
break;
case AbstractRecipient::KINDS__VOICE:
$recipient = (new PhoneRecipient())
->setMethod(PhoneRecipient::METHODS__VOICE);
break;
default:
throw new \Exception(sprintf(
'Cannot create contact of type "%s".',
$payload['type']
));
}
// set common things
$recipient
->setContact($contact)
->setStanding(Reachability::STANDINGS__UNKNOWN)
->setContactability(Reachability::CONTACTABILITIES__UNKNOWN)
->setReachability(Reachability::REACHABILITIES__OPTIMISTIC);
// need to save the recipient in the database
$this->em->save($recipient);
}
// TODO: set the id in the payload for easier retrieval later...
}
/**
* @param OneRosterAdhocEvent $event
* @return void
*/
public function contactLookup(OneRosterAdhocEvent $event): void
{
// TODO: use an id or something to make finding things faster
// skip this if we are flagged to not do this process
if ($event->getSync()->hasFlag(OneRosterSync::FLAGS__SKIP_TWILIO_LOOKUPS)) {
return;
}
// obtain the payload
$payload = $event->getPayload();
// only do for phone types
if ( ! in_array($payload['type'], [AbstractRecipient::KINDS__SMS, AbstractRecipient::KINDS__VOICE])) {
return;
}
// grab the contact, already normalized
$contact = $payload['contact'];
// attempt to find an existing recipient
$recipient = $this->em->getRepository(PhoneRecipient::class)->findOneBy([
'contact' => $contact,
]);
// if we do not find anything, we have a problem
if ( ! $recipient instanceof PhoneRecipient) {
throw new \Exception();
}
// decide if we should skip or not
switch (true) {
case $recipient->getLookupStatus() === PhoneRecipient::LOOKUPS__MISSING:
case $recipient->getLookupStatus() === PhoneRecipient::LOOKUPS__FAILED:
$check = true;
break;
case $recipient->getLookupStatus() === PhoneRecipient::LOOKUPS__SUCCESSFUL:
$check = false;
break;
default:
throw new \Exception();
}
// attempt to do the lookup at twilio if needed
if ($check) {
// wrap in try catch in case it fails
try {
// run the api call
$result = $this->twilio->fallback($recipient)->lookups->v1
->phoneNumbers($this->phones->normalize($recipient->getContact()))
// TWILIO API USAGE
// https://www.twilio.com/docs/lookup/v1-api#api-url
// GET https://lookups.twilio.com/v1/PhoneNumbers/{PhoneNumber}
->fetch([
'type' => ['carrier'],
]);
// update lookup cache
$recipient
->setLookup((static function (array $lookup) {
unset($lookup['url']);
unset($lookup['addOns']);
unset($lookup['callerName']);
return $lookup;
})($result->toArray()))
// TODO: need to check the contents to make sure we got good data?
->setLookupStatus(PhoneRecipient::LOOKUPS__SUCCESSFUL)
->setLookupTimestamp(DateTimeUtils::now());
} catch (\Exception $e) {
// mark as failed
$recipient
->setLookupStatus(PhoneRecipient::LOOKUPS__FAILED)
->setLookupTimestamp(DateTimeUtils::now());
// log an error
$this->oneroster->logIssue(
$event->getJob(),
OneRosterJob::PHASES__PREPARE,
self::ADHOC_EVENTS__CONTACT_PREPARATION,
PhoneRecipient::DISCR,
$recipient->getUidString(),
$e
);
}
}
// grab the lookup data
$lookup = $recipient->getLookup();
// set the type, prefer lookup information if available
if (is_array($lookup) && array_key_exists('carrier', $lookup) && is_array($lookup['carrier']) && array_key_exists('type', $lookup['carrier']) && isset($lookup['carrier']['type'])) {
switch ($lookup['carrier']['type']) {
case 'mobile':
$recipient->setType(PhoneRecipient::TYPES__MOBILE);
break;
case 'landline':
$recipient->setType(PhoneRecipient::TYPES__LANDLINE);
break;
case 'voip':
$recipient->setType(PhoneRecipient::TYPES__VOIP);
break;
}
}
// set the method, prefer lookup information if available
if (is_array($lookup) && array_key_exists('carrier', $lookup) && is_array($lookup['carrier']) && array_key_exists('type', $lookup['carrier']) && isset($lookup['carrier']['type'])) {
switch ($lookup['carrier']['type']) {
case 'mobile':
// TODO: should we use hybrid or sms here?
$recipient->setMethod(PhoneRecipient::METHODS__HYBRID);
break;
case 'landline':
case 'voip':
$recipient->setMethod(PhoneRecipient::METHODS__VOICE);
break;
}
}
// save changes
$this->em->save($recipient);
}
}