Skip to content

Commit

Permalink
Fixed broken test
Browse files Browse the repository at this point in the history
  • Loading branch information
petemcfarlane committed Feb 10, 2016
1 parent 6c2e707 commit 03860e5
Showing 1 changed file with 162 additions and 166 deletions.
328 changes: 162 additions & 166 deletions src/EventListener/RequestListener.php
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -22,178 +22,174 @@

namespace Uecode\Bundle\QPushBundle\EventListener;

use Psr\Log\LoggerInterface;
use Symfony\Component\HttpKernel\HttpKernel;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Event\GetResponseEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Uecode\Bundle\QPushBundle\Message\Notification;
use Symfony\Component\HttpKernel\HttpKernel;
use Uecode\Bundle\QPushBundle\Event\Events;
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
use Uecode\Bundle\QPushBundle\Message\Notification;

/**
* @author Keith Kirk <kkirk@undergroundelephant.com>
*/
class RequestListener
{
/**
* Symfony Event Dispatcher
*
* @var EventDispatcherInterface
*/
private $dispatcher;

/**
* Constructor.
*
* @param EventDispatcherInterface $dispatcher A Symfony Event Dispatcher
*/
public function __construct(EventDispatcherInterface $dispatcher)
{
$this->dispatcher = $dispatcher;
}

/**
* Kernel Request Event Handler for QPush Notifications
*
* @param GetResponseEvent $event The Kernel Request's GetResponseEvent
*/
public function onKernelRequest(GetResponseEvent $event)
{
if (HttpKernel::MASTER_REQUEST != $event->getRequestType()) {
return;
}

if ($event->getRequest()->headers->has('x-amz-sns-message-type')) {
$result = $this->handleSnsNotifications($event);
$event->setResponse(new Response($result, 200));
}

if ($event->getRequest()->headers->has('iron-message-id')) {
$result = $this->handleIronMqNotifications($event);
$event->setResponse(new Response($result, 200));
}
}

/**
* Handles Messages sent from a IronMQ Push Queue
*
* @param GetResponseEvent $event The Kernel Request's GetResponseEvent
* @return string|void
*/
private function handleIronMqNotifications(GetResponseEvent $event)
{
$headers = $event->getRequest()->headers;
$messageId = $headers->get('iron-message-id');

if (null === ($message = json_decode($event->getRequest()->getContent(), true))) {
throw new \InvalidArgumentException('Unable to decode JSON');
}

$queue = $this->getIronMqQueueName($event, $message);
$metadata = [
'iron-subscriber-message-id' => $headers->get('iron-subscriber-message-id'),
'iron-subscriber-message-url' => $headers->get('iron-subscriber-message-url')
];

$notification = new Notification(
$messageId,
$message,
$metadata
);

$this->dispatcher->dispatch(
Events::Notification($queue),
new NotificationEvent($queue, NotificationEvent::TYPE_MESSAGE, $notification)
);

return "IronMQ Notification Received.";
}

/**
* Handles Notifications sent from AWS SNS
*
* @param GetResponseEvent $event The Kernel Request's GetResponseEvent
* @return string
*/
private function handleSnsNotifications(GetResponseEvent $event)
{
$notification = json_decode((string)$event->getRequest()->getContent(), true);

$type = $event->getRequest()->headers->get('x-amz-sns-message-type');

$metadata = [
'Type' => $notification['Type'],
'TopicArn' => $notification['TopicArn'],
'Timestamp' => $notification['Timestamp'],
];

if ($type === 'Notification') {

// We put the queue name in the Subject field
$queue = $notification['Subject'];
$metadata['Subject'] = $queue;

$notification = new Notification(
$notification['MessageId'],
$notification['Message'],
$metadata
);

$this->dispatcher->dispatch(
Events::Notification($queue),
new NotificationEvent($queue, NotificationEvent::TYPE_MESSAGE, $notification)
);

return "SNS Message Notification Received.";
}

// For subscription notifications, we need to parse the Queue from
// the Topic ARN
$arnParts = explode(':', $notification['TopicArn']);
$last = end($arnParts);
$queue = str_replace('qpush_', '', $last);

// Get the token for the Subscription Confirmation
$metadata['Token'] = $notification['Token'];

$notification = new Notification(
$notification['MessageId'],
$notification['Message'],
$metadata
);

$this->dispatcher->dispatch(
Events::Notification($queue),
new NotificationEvent($queue, NotificationEvent::TYPE_SUBSCRIPTION, $notification)
);

return "SNS Subscription Confirmation Received.";
}

/**
* Get the name of the IronMq queue.
*
* @param GetResponseEvent $event
* @param array $message
*
* @return string
*/
private function getIronMqQueueName(GetResponseEvent $event, array &$message)
{
if (array_key_exists('_qpush_queue', $message)) {
return $message['_qpush_queue'];
} else if (null !== ($subscriberUrl = $event->getRequest()->headers->get('iron-subscriber-message-url'))) {
if (preg_match('#/queues/([a-z0-9_-]+)/messages/#i', $subscriberUrl, $matches)) {
$queue = $matches[1];
if (substr($queue, 0, 6) == 'qpush_') {
$queue = substr($queue, 6);
}

return $queue;
}
}

throw new \RuntimeException('Unable to get queue name');
}

class RequestListener {
/**
* Symfony Event Dispatcher
*
* @var EventDispatcherInterface
*/
private $dispatcher;

/**
* Constructor.
*
* @param EventDispatcherInterface $dispatcher A Symfony Event Dispatcher
*/
public function __construct(EventDispatcherInterface $dispatcher) {
$this->dispatcher = $dispatcher;
}

/**
* Kernel Request Event Handler for QPush Notifications
*
* @param GetResponseEvent $event The Kernel Request's GetResponseEvent
*/
public function onKernelRequest(GetResponseEvent $event) {
if (HttpKernel::MASTER_REQUEST != $event->getRequestType()) {
return;
}

if ($event->getRequest()->headers->has('x-amz-sns-message-type')) {
$result = $this->handleSnsNotifications($event);
$event->setResponse(new Response($result, 200));
}

if ($event->getRequest()->headers->has('iron-message-id')) {
$result = $this->handleIronMqNotifications($event);
$event->setResponse(new Response($result, 200));
}
}

/**
* Handles Messages sent from a IronMQ Push Queue
*
* @param GetResponseEvent $event The Kernel Request's GetResponseEvent
* @return string|void
*/
private function handleIronMqNotifications(GetResponseEvent $event) {
$headers = $event->getRequest()->headers;
$messageId = $headers->get('iron-message-id');

if (null === ($message = json_decode($event->getRequest()->getContent(), true))) {
throw new \InvalidArgumentException('Unable to decode JSON');
}

$queue = $this->getIronMqQueueName($event, $message);
$metadata = [
'iron-subscriber-message-id' => $headers->get('iron-subscriber-message-id'),
'iron-subscriber-message-url' => $headers->get('iron-subscriber-message-url')
];

unset($message['_qpush_queue']);

$notification = new Notification(
$messageId,
$message,
$metadata
);

$this->dispatcher->dispatch(
Events::Notification($queue),
new NotificationEvent($queue, NotificationEvent::TYPE_MESSAGE, $notification)
);

return "IronMQ Notification Received.";
}

/**
* Handles Notifications sent from AWS SNS
*
* @param GetResponseEvent $event The Kernel Request's GetResponseEvent
* @return string
*/
private function handleSnsNotifications(GetResponseEvent $event) {
$notification = json_decode((string) $event->getRequest()->getContent(), true);

$type = $event->getRequest()->headers->get('x-amz-sns-message-type');

$metadata = [
'Type' => $notification['Type'],
'TopicArn' => $notification['TopicArn'],
'Timestamp' => $notification['Timestamp'],
];

if ($type === 'Notification') {

// We put the queue name in the Subject field
$queue = $notification['Subject'];
$metadata['Subject'] = $queue;

$notification = new Notification(
$notification['MessageId'],
$notification['Message'],
$metadata
);

$this->dispatcher->dispatch(
Events::Notification($queue),
new NotificationEvent($queue, NotificationEvent::TYPE_MESSAGE, $notification)
);

return "SNS Message Notification Received.";
}

// For subscription notifications, we need to parse the Queue from
// the Topic ARN
$arnParts = explode(':', $notification['TopicArn']);
$last = end($arnParts);
$queue = str_replace('qpush_', '', $last);

// Get the token for the Subscription Confirmation
$metadata['Token'] = $notification['Token'];

$notification = new Notification(
$notification['MessageId'],
$notification['Message'],
$metadata
);

$this->dispatcher->dispatch(
Events::Notification($queue),
new NotificationEvent($queue, NotificationEvent::TYPE_SUBSCRIPTION, $notification)
);

return "SNS Subscription Confirmation Received.";
}

/**
* Get the name of the IronMq queue.
*
* @param GetResponseEvent $event
* @param array $message
*
* @return string
*/
private function getIronMqQueueName(GetResponseEvent $event, array&$message) {
if (array_key_exists('_qpush_queue', $message)) {
return $message['_qpush_queue'];
} else if (null !== ($subscriberUrl = $event->getRequest()->headers->get('iron-subscriber-message-url'))) {
if (preg_match('#/queues/([a-z0-9_-]+)/messages/#i', $subscriberUrl, $matches)) {
$queue = $matches[1];
if (substr($queue, 0, 6) == 'qpush_') {
$queue = substr($queue, 6);
}

return $queue;
}
}

throw new \RuntimeException('Unable to get queue name');
}
}

0 comments on commit 03860e5

Please sign in to comment.