Skip to content

Commit

Permalink
Merge pull request #61 from uecode/revert-54-master
Browse files Browse the repository at this point in the history
Merged
  • Loading branch information
k-k committed Jan 26, 2015
2 parents 9f70222 + 5cfa081 commit 932c1b0
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 32 deletions.
13 changes: 5 additions & 8 deletions src/Command/QueueReceiveCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,16 @@ private function pollQueue($registry, $name)
);
}

$provider = $registry->get($name);
$messages = $provider->receive();
$count = sizeof($messages);
foreach ($messages as $message) {

$messageEvent = new MessageEvent($name, $message);
$dispatcher = $this->getContainer()->get('event_dispatcher');
$dispatcher = $this->getContainer()->get('event_dispatcher');
$messages = $registry->get($name)->receive();

foreach ($messages as $message) {
$messageEvent = new MessageEvent($name, $message);
$dispatcher->dispatch(Events::Message($name), $messageEvent);
}

$msg = "<info>Finished polling %s Queue, %d messages fetched.</info>";
$this->output->writeln(sprintf($msg, $name, $count));
$this->output->writeln(sprintf($msg, $name, sizeof($messages)));

return 0;
}
Expand Down
8 changes: 4 additions & 4 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ private function getProvidersNode()

private function getQueuesNode()
{
$treeBuilder = new TreeBuilder();
$node = $treeBuilder->root('queues');
$treeBuilder = new TreeBuilder();
$node = $treeBuilder->root('queues');

$node
->requiresAtLeastOneElement()
Expand Down Expand Up @@ -187,8 +187,8 @@ private function getQueuesNode()

private function getSubscribersNode()
{
$treeBuilder = new TreeBuilder();
$node = $treeBuilder->root('subscribers');
$treeBuilder = new TreeBuilder();
$node = $treeBuilder->root('subscribers');

$node
->prototype('array')
Expand Down
15 changes: 11 additions & 4 deletions src/EventListener/RequestListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,22 @@ private function handleIronMqNotifications(GetResponseEvent $event)

// We add the message in an array with Queue as the property name
$message = json_decode($event->getRequest()->getContent(), true);
$queue = key($message);

if (empty($message['_qpush_queue'])) {
return;
}

$queue = $message['_qpush_queue'];
$metadata = [
'iron-subscriber-message-id' => $headers->get('iron-subscriber-message-id'),
'iron-subscriber-message-url' => $headers->get('iron-subscriber-message-url')
'iron-subscriber-message-id' => $headers->get('iron-subscriber-message-id'),
'iron-subscriber-message-url' => $headers->get('iron-subscriber-message-url')
];

unset($message['_qpush_queue']);

$notification = new Notification(
$messageId,
$message[$queue],
$message,
$metadata
);

Expand Down
8 changes: 5 additions & 3 deletions src/Provider/IronMqProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public function publish(array $message, array $options = [])

$result = $this->ironmq->postMessage(
$this->getNameWithPrefix(),
json_encode($message),
json_encode($message + ['_qpush_queue' => $this->name]),
[
'timeout' => $options['message_timeout'],
'delay' => $options['message_delay'],
Expand Down Expand Up @@ -184,14 +184,16 @@ public function receive(array $options = [])
// Convert to Message Class
foreach ($messages as &$message) {
$id = $message->id;
$body = $message->body;
$body = json_decode($message->body, true);
$metadata = [
'timeout' => $message->timeout,
'reserved_count' => $message->reserved_count,
'push_status' => $message->push_status
];

$message = new Message($id, $body, $metadata);
unset($body['_qpush_queue']);

$message = new Message($id, json_encode($body), $metadata);

$this->log(200, "Message has been received.", ['message_id' => $id]);
}
Expand Down
68 changes: 58 additions & 10 deletions tests/EventListener/RequestListenerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
use Symfony\Component\HttpKernel\KernelEvents;
use Symfony\Component\HttpKernel\Event\GetResponseEvent;
use Uecode\Bundle\QPushBundle\EventListener\RequestListener;
use Uecode\Bundle\QPushBundle\Event\Events as QPushEvents;
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;

/**
* @author Keith Kirk <kkirk@undergroundelephant.com>
Expand All @@ -47,11 +49,14 @@ class RequestListenerTest extends \PHPUnit_Framework_TestCase

public function setUp()
{
$listener = new RequestListener($this->getMock('Symfony\Component\EventDispatcher\EventDispatcherInterface'));
$this->dispatcher = new EventDispatcher('UTF-8');
$listener = new RequestListener($this->dispatcher);

$this->dispatcher->addListener(KernelEvents::REQUEST, [$listener, 'onKernelRequest']);
$this->dispatcher->addListener(QPushEvents::Notification('ironmq-test'), [$this, 'IronMqOnNotificationReceived']);
$this->dispatcher->addListener(QPushEvents::Notification('aws-test'), [$this, 'AwsOnNotificationReceived']);

$this->kernel = $this->getMock('Symfony\Component\HttpKernel\HttpKernelInterface');
$this->kernel = $this->getMock('Symfony\Component\HttpKernel\HttpKernelInterface');
}

public function testListenerDoesNothingForSubRequests()
Expand All @@ -64,9 +69,9 @@ public function testListenerDoesNothingForSubRequests()

public function testListenerHandlesIronMQMessageRequests()
{
$message = ['test' => '{"foo": "bar"}'];
$message = '{"foo": "bar","_qpush_queue":"ironmq-test"}';

$request = new Request([],[],[],[],[],[], json_encode($message));
$request = new Request([],[],[],[],[],[], $message);
$request->headers->set('iron-message-id', 123);
$request->headers->set('iron-subscriber-message-id', 456);
$request->headers->set('iron-subscriber-message-url', 'http://foo.bar');
Expand All @@ -78,27 +83,70 @@ public function testListenerHandlesIronMQMessageRequests()
$this->assertEquals("IronMQ Notification Received.", $event->getResponse()->getContent());
}

public function IronMqOnNotificationReceived(NotificationEvent $event)
{
$notification = $event->getNotification();
$this->assertInstanceOf('\Uecode\Bundle\QPushBundle\Message\Notification', $notification);

$this->assertEquals(123, $notification->getId());

$this->assertInternalType('array', $notification->getBody());
$this->assertEquals($notification->getBody(), ['foo' => 'bar']);

$this->assertInstanceOf('\Doctrine\Common\Collections\ArrayCollection', $notification->getMetadata());
$this->assertEquals(
[
'iron-subscriber-message-id' => 456,
'iron-subscriber-message-url' => 'http://foo.bar'
],
$notification->getMetadata()->toArray()
);
}

public function testListenerHandlesAwsNotificationRequests()
{
$message = [
'Type' => 'Notification',
'MessageId' => 123,
'TopicArn' => 'SomeArn',
'Subject' => 'Test',
'Message' => 'Test Message',
'Timestamp' => date('Y-m-d H:i:s')
'Subject' => 'aws-test',
'Message' => '{"foo": "bar"}',
'Timestamp' => date('Y-m-d H:i:s', 1422040603)
];

$request = new Request([],[],[],[],[],[], json_encode($message));
$request->headers->set('x-amz-sns-message-type', 'Notification');

$event = new GetResponseEvent($this->kernel, $request, HttpKernelInterface::MASTER_REQUEST);

$this->dispatcher->dispatch(KernelEvents::REQUEST, $event);

$this->assertTrue($event->hasResponse());
$this->assertEquals("SNS Message Notification Received.", $event->getResponse()->getContent());
}

public function AwsOnNotificationReceived(NotificationEvent $event)
{
$notification = $event->getNotification();
$this->assertInstanceOf('\Uecode\Bundle\QPushBundle\Message\Notification', $notification);

$this->assertEquals(123, $notification->getId());

$this->assertInternalType('array', $notification->getBody());
$this->assertEquals($notification->getBody(), ['foo' => 'bar']);

$this->assertInstanceOf('\Doctrine\Common\Collections\ArrayCollection', $notification->getMetadata());
$this->assertEquals(
[
'Type' => 'Notification',
'TopicArn' => 'SomeArn',
'Timestamp' => date('Y-m-d H:i:s', 1422040603),
'Subject' => 'aws-test'
],
$notification->getMetadata()->toArray()
);
}

public function testListenerHandlesAwsSubscriptionRequests()
{
$message = [
Expand All @@ -107,9 +155,9 @@ public function testListenerHandlesAwsSubscriptionRequests()
'Token' => 456,
'TopicArn' => 'SomeArn',
'SubscribeUrl' => 'http://foo.bar',
'Subject' => 'Test',
'Message' => 'Test Message',
'Timestamp' => date('Y-m-d H:i:s')
'Subject' => 'aws-test',
'Message' => '{"foo": "bar"}',
'Timestamp' => date('Y-m-d H:i:s', 1422040603)
];

$request = new Request([],[],[],[],[],[], json_encode($message));
Expand Down
4 changes: 2 additions & 2 deletions tests/MockClient/IronMqMockClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public function postMessage($queue, $message, $options)
{
$response = new \stdClass;
$response->id = 123;
$response->ids = [ 123 ];
$response->ids = [123];
$response->msg = "Messages put on queue.";

return $response;
Expand All @@ -74,7 +74,7 @@ public function getMessages($queue, $count, $timeout)
{
$response = new \stdClass;
$response->id = 123;
$response->body = '{"test":{"foo":"bar"}}';
$response->body = '{"foo":"bar","_qpush_queue":"test"}';
$response->timeout = 60;
$response->reserved_count = 1;
$response->push_status = new \stdClass;
Expand Down
5 changes: 4 additions & 1 deletion tests/Provider/IronMqProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ public function testPublish()

public function testReceive()
{
$this->assertTrue(is_array($this->provider->receive()));
$messages = $this->provider->receive();
$this->assertInternalType('array', $messages);
$this->assertEquals(['foo' => 'bar'], $messages[0]->getBody());
}

public function testDelete()
Expand All @@ -178,6 +180,7 @@ public function testOnNotification()
NotificationEvent::TYPE_MESSAGE,
new Notification(123, "test", [])
);

$event->setDispatcher(
$this->getMock('Symfony\Component\EventDispatcher\EventDispatcherInterface')
);
Expand Down

0 comments on commit 932c1b0

Please sign in to comment.